[DistGB] enable dist partition pipeline to save FusedCSCSamplingGraph partition directly (#7728)

Co-authored-by: Ubuntu <ubuntu@ip-172-31-8-126.us-west-2.compute.internal>
Co-authored-by: Ubuntu <ubuntu@ip-172-31-52-174.us-west-2.compute.internal>
Co-authored-by: Rhett Ying <85214957+Rhett-Ying@users.noreply.github.com>
This commit is contained in:
Wenxuan Cao
2024-09-19 17:05:11 +08:00
committed by GitHub
parent d3453c3f14
commit 3bc8e228fc
7 changed files with 1520 additions and 88 deletions

View File

@@ -1600,8 +1600,6 @@ def _save_graph_gb(part_config, part_id, csc_graph):
def cast_various_to_minimum_dtype_gb( def cast_various_to_minimum_dtype_gb(
graph,
part_meta,
num_parts, num_parts,
indptr, indptr,
indices, indices,
@@ -1610,25 +1608,43 @@ def cast_various_to_minimum_dtype_gb(
ntypes, ntypes,
node_attributes, node_attributes,
edge_attributes, edge_attributes,
part_meta=None,
graph=None,
edge_count=None,
node_count=None,
tot_edge_count=None,
tot_node_count=None,
): ):
"""Cast various data to minimum dtype.""" """Cast various data to minimum dtype."""
if graph is not None:
assert part_meta is not None
tot_edge_count = graph.num_edges()
tot_node_count = graph.num_nodes()
node_count = part_meta["num_nodes"]
edge_count = part_meta["num_edges"]
else:
assert tot_edge_count is not None
assert tot_node_count is not None
assert edge_count is not None
assert node_count is not None
# Cast 1: indptr. # Cast 1: indptr.
indptr = _cast_to_minimum_dtype(graph.num_edges(), indptr) indptr = _cast_to_minimum_dtype(tot_edge_count, indptr)
# Cast 2: indices. # Cast 2: indices.
indices = _cast_to_minimum_dtype(graph.num_nodes(), indices) indices = _cast_to_minimum_dtype(tot_node_count, indices)
# Cast 3: type_per_edge. # Cast 3: type_per_edge.
type_per_edge = _cast_to_minimum_dtype( type_per_edge = _cast_to_minimum_dtype(
len(etypes), type_per_edge, field=ETYPE len(etypes), type_per_edge, field=ETYPE
) )
# Cast 4: node/edge_attributes. # Cast 4: node/edge_attributes.
predicates = { predicates = {
NID: part_meta["num_nodes"], NID: node_count,
"part_id": num_parts, "part_id": num_parts,
NTYPE: len(ntypes), NTYPE: len(ntypes),
EID: part_meta["num_edges"], EID: edge_count,
ETYPE: len(etypes), ETYPE: len(etypes),
DGL2GB_EID: part_meta["num_edges"], DGL2GB_EID: edge_count,
GB_DST_ID: part_meta["num_nodes"], GB_DST_ID: node_count,
} }
for attributes in [node_attributes, edge_attributes]: for attributes in [node_attributes, edge_attributes]:
for key in attributes: for key in attributes:
@@ -1779,16 +1795,16 @@ def gb_convert_single_dgl_partition(
) )
indptr, indices, type_per_edge = cast_various_to_minimum_dtype_gb( indptr, indices, type_per_edge = cast_various_to_minimum_dtype_gb(
graph, graph=graph,
part_meta, part_meta=part_meta,
num_parts, num_parts=num_parts,
indptr, indptr=indptr,
indices, indices=indices,
type_per_edge, type_per_edge=type_per_edge,
etypes, etypes=etypes,
ntypes, ntypes=ntypes,
node_attributes, node_attributes=node_attributes,
edge_attributes, edge_attributes=edge_attributes,
) )
csc_graph = gb.fused_csc_sampling_graph( csc_graph = gb.fused_csc_sampling_graph(

File diff suppressed because it is too large Load Diff

View File

@@ -75,6 +75,10 @@ def submit_jobs(args) -> str:
argslist += "--log-level {} ".format(args.log_level) argslist += "--log-level {} ".format(args.log_level)
argslist += "--save-orig-nids " if args.save_orig_nids else "" argslist += "--save-orig-nids " if args.save_orig_nids else ""
argslist += "--save-orig-eids " if args.save_orig_eids else "" argslist += "--save-orig-eids " if args.save_orig_eids else ""
argslist += "--use-graphbolt " if args.use_graphbolt else ""
argslist += "--store-eids " if args.store_eids else ""
argslist += "--store-inner-node " if args.store_inner_node else ""
argslist += "--store-inner-edge " if args.store_inner_edge else ""
argslist += ( argslist += (
f"--graph-formats {args.graph_formats} " if args.graph_formats else "" f"--graph-formats {args.graph_formats} " if args.graph_formats else ""
) )
@@ -159,6 +163,30 @@ def main():
action="store_true", action="store_true",
help="Save original edge IDs into files", help="Save original edge IDs into files",
) )
parser.add_argument(
"--use-graphbolt",
action="store_true",
help="Use GraphBolt for distributed partition.",
)
parser.add_argument(
"--store-inner-node",
action="store_true",
default=False,
help="Store inner nodes.",
)
parser.add_argument(
"--store-inner-edge",
action="store_true",
default=False,
help="Store inner edges.",
)
parser.add_argument(
"--store-eids",
action="store_true",
default=False,
help="Store edge IDs.",
)
parser.add_argument( parser.add_argument(
"--graph-formats", "--graph-formats",
type=str, type=str,

View File

@@ -1,24 +1,25 @@
import argparse import copy
import gc import gc
import json
import logging import logging
import os import os
import time
import constants import constants
import dgl import dgl
import dgl.backend as F
import dgl.graphbolt as gb
import numpy as np import numpy as np
import pandas as pd
import pyarrow
import torch as th import torch as th
from dgl import EID, ETYPE, NID, NTYPE
from dgl.distributed.constants import DGL2GB_EID, GB_DST_ID
from dgl.distributed.partition import ( from dgl.distributed.partition import (
_cast_to_minimum_dtype,
_etype_str_to_tuple, _etype_str_to_tuple,
_etype_tuple_to_str, _etype_tuple_to_str,
cast_various_to_minimum_dtype_gb,
RESERVED_FIELD_DTYPE, RESERVED_FIELD_DTYPE,
) )
from pyarrow import csv from utils import get_idranges, memory_snapshot
from utils import get_idranges, memory_snapshot, read_json
def _get_unique_invidx(srcids, dstids, nids, low_mem=True): def _get_unique_invidx(srcids, dstids, nids, low_mem=True):
@@ -164,7 +165,202 @@ def _get_unique_invidx(srcids, dstids, nids, low_mem=True):
return uniques, idxes, srcids, dstids return uniques, idxes, srcids, dstids
def create_dgl_object( # Utility functions.
def _is_homogeneous(ntypes, etypes):
"""Checks if the provided ntypes and etypes form a homogeneous graph."""
return len(ntypes) == 1 and len(etypes) == 1
def _coo2csc(src_ids, dst_ids):
src_ids, dst_ids = th.tensor(src_ids, dtype=th.int64), th.tensor(
dst_ids, dtype=th.int64
)
num_nodes = th.max(th.stack([src_ids, dst_ids], dim=0)).item() + 1
dst, idx = dst_ids.sort()
indptr = th.searchsorted(dst, th.arange(num_nodes + 1))
indices = src_ids[idx]
return indptr, indices, idx
def _create_edge_data(edgeid_offset, etype_ids, num_edges):
eid = th.arange(
edgeid_offset,
edgeid_offset + num_edges,
dtype=RESERVED_FIELD_DTYPE[dgl.EID],
)
etype = th.as_tensor(etype_ids, dtype=RESERVED_FIELD_DTYPE[dgl.ETYPE])
inner_edge = th.ones(num_edges, dtype=RESERVED_FIELD_DTYPE["inner_edge"])
return eid, etype, inner_edge
def _create_node_data(ntype, uniq_ids, reshuffle_nodes, inner_nodes):
node_type = th.as_tensor(ntype, dtype=RESERVED_FIELD_DTYPE[dgl.NTYPE])
node_id = th.as_tensor(uniq_ids[reshuffle_nodes])
inner_node = th.as_tensor(
inner_nodes[reshuffle_nodes],
dtype=RESERVED_FIELD_DTYPE["inner_node"],
)
return node_type, node_id, inner_node
def _compute_node_ntype(
global_src_id, global_dst_id, global_homo_nid, idx, reshuffle_nodes, id_map
):
global_ids = np.concatenate([global_src_id, global_dst_id, global_homo_nid])
part_global_ids = global_ids[idx]
part_global_ids = part_global_ids[reshuffle_nodes]
ntype, per_type_ids = id_map(part_global_ids)
return ntype, per_type_ids
def _graph_orig_ids(
return_orig_nids,
return_orig_eids,
ntypes_map,
etypes_map,
node_attr,
edge_attr,
per_type_ids,
type_per_edge,
global_edge_id,
):
orig_nids = None
orig_eids = None
if return_orig_nids:
orig_nids = {}
for ntype, ntype_id in ntypes_map.items():
mask = th.logical_and(
node_attr[dgl.NTYPE] == ntype_id,
node_attr["inner_node"],
)
orig_nids[ntype] = th.as_tensor(per_type_ids[mask])
if return_orig_eids:
orig_eids = {}
for etype, etype_id in etypes_map.items():
mask = th.logical_and(
type_per_edge == etype_id,
edge_attr["inner_edge"],
)
orig_eids[_etype_tuple_to_str(etype)] = th.as_tensor(
global_edge_id[mask]
)
return orig_nids, orig_eids
def _create_edge_attr_gb(
part_local_dst_id, edgeid_offset, etype_ids, ntypes, etypes, etypes_map
):
edge_attr = {}
# create edge data in graph.
num_edges = len(part_local_dst_id)
(
edge_attr[dgl.EID],
type_per_edge,
edge_attr["inner_edge"],
) = _create_edge_data(edgeid_offset, etype_ids, num_edges)
assert "inner_edge" in edge_attr
is_homo = _is_homogeneous(ntypes, etypes)
edge_type_to_id = (
{gb.etype_tuple_to_str(("_N", "_E", "_N")): 0}
if is_homo
else {
gb.etype_tuple_to_str(etype): etid
for etype, etid in etypes_map.items()
}
)
return edge_attr, type_per_edge, edge_type_to_id
def _create_node_attr(
idx,
global_src_id,
global_dst_id,
global_homo_nid,
uniq_ids,
reshuffle_nodes,
id_map,
inner_nodes,
):
# compute per_type_ids and ntype for all the nodes in the graph.
ntype, per_type_ids = _compute_node_ntype(
global_src_id,
global_dst_id,
global_homo_nid,
idx,
reshuffle_nodes,
id_map,
)
# create node data in graph.
node_attr = {}
(
node_attr[dgl.NTYPE],
node_attr[dgl.NID],
node_attr["inner_node"],
) = _create_node_data(ntype, uniq_ids, reshuffle_nodes, inner_nodes)
return node_attr, per_type_ids
def remove_attr_gb(
edge_attr, node_attr, store_inner_node, store_inner_edge, store_eids
):
edata, ndata = copy.deepcopy(edge_attr), copy.deepcopy(node_attr)
if not store_inner_edge:
assert "inner_edge" in edata
edata.pop("inner_edge")
if not store_eids:
assert dgl.EID in edata
edata.pop(dgl.EID)
if not store_inner_node:
assert "inner_node" in ndata
ndata.pop("inner_node")
return edata, ndata
def _process_partition_gb(
node_attr,
edge_attr,
type_per_edge,
src_ids,
dst_ids,
sort_etypes,
):
"""Preprocess partitions before saving:
1. format data types.
2. sort csc/csr by tag.
"""
for k, dtype in RESERVED_FIELD_DTYPE.items():
if k in node_attr:
node_attr[k] = F.astype(node_attr[k], dtype)
if k in edge_attr:
edge_attr[k] = F.astype(edge_attr[k], dtype)
indptr, indices, edge_ids = _coo2csc(src_ids, dst_ids)
if sort_etypes:
split_size = th.diff(indptr)
split_indices = th.split(type_per_edge, tuple(split_size), dim=0)
sorted_idxs = []
for split_indice in split_indices:
sorted_idxs.append(split_indice.sort()[1])
sorted_idx = th.cat(sorted_idxs, dim=0)
sorted_idx = (
th.repeat_interleave(indptr[:-1], split_size, dim=0) + sorted_idx
)
return indptr, indices[sorted_idx], edge_ids[sorted_idx]
def create_graph_object(
tot_node_count,
tot_edge_count,
node_count,
edge_count,
num_parts,
schema, schema,
part_id, part_id,
node_data, node_data,
@@ -174,6 +370,8 @@ def create_dgl_object(
edge_typecounts, edge_typecounts,
return_orig_nids=False, return_orig_nids=False,
return_orig_eids=False, return_orig_eids=False,
use_graphbolt=False,
**kwargs,
): ):
""" """
This function creates dgl objects for a given graph partition, as in function This function creates dgl objects for a given graph partition, as in function
@@ -223,6 +421,18 @@ def create_dgl_object(
Parameters: Parameters:
----------- -----------
tot_node_count : int
the number of all nodes
tot_edge_count : int
the number of all edges
node_count : int
the number of nodes in partition
edge_count : int
the number of edges in partition
graph_formats : str
the format of graph
num_parts : int
the number of parts
schame : json object schame : json object
json object created by reading the graph metadata json file json object created by reading the graph metadata json file
part_id : int part_id : int
@@ -449,58 +659,134 @@ def create_dgl_object(
nid_map[part_local_dst_id], nid_map[part_local_dst_id],
) )
"""
Creating attributes for graphbolt and DGLGraph is as follows.
node attributes:
this part is implemented in _create_node_attr.
compute the ntype and per type ids for each node with global node type id.
create ntype, nid and inner node with orig ntype and inner nodes
this part is shared by graphbolt and DGLGraph.
the attributes created for graphbolt are as follows:
edge attributes:
this part is implemented in _create_edge_attr_gb.
create eid, type per edge and inner edge with edgeid_offset.
create edge_type_to_id with etypes_map.
The process to remove extra attribute is implemented in remove_attr_gb.
the unused attributes like inner_node, inner_edge, eids will be removed following the arguments in kwargs.
edge_attr, node_attr are the variable that have removed extra attributes to construct csc_graph.
edata, ndata are the variable that reserve extra attributes to be used to generate orig_nid and orig_eid.
the src_ids and dst_ids will be transformed into indptr and indices in _coo2csc.
all variable mentioned above will be casted to minimum data type in cast_various_to_minimum_dtype_gb.
orig_nids and orig_eids will be generated in _graph_orig_ids with ndata and edata.
"""
# create the graph here now. # create the graph here now.
part_graph = dgl.graph( ndata, per_type_ids = _create_node_attr(
data=(part_local_src_id, part_local_dst_id), num_nodes=len(uniq_ids) idx,
) global_src_id,
part_graph.edata[dgl.EID] = th.arange( global_dst_id,
edgeid_offset, global_homo_nid,
edgeid_offset + part_graph.num_edges(), uniq_ids,
dtype=th.int64, reshuffle_nodes,
) id_map,
part_graph.edata[dgl.ETYPE] = th.as_tensor( inner_nodes,
etype_ids, dtype=RESERVED_FIELD_DTYPE[dgl.ETYPE]
)
part_graph.edata["inner_edge"] = th.ones(
part_graph.num_edges(), dtype=RESERVED_FIELD_DTYPE["inner_edge"]
) )
if use_graphbolt:
edata, type_per_edge, edge_type_to_id = _create_edge_attr_gb(
part_local_dst_id,
edgeid_offset,
etype_ids,
ntypes,
etypes,
etypes_map,
)
# compute per_type_ids and ntype for all the nodes in the graph. assert edata is not None
global_ids = np.concatenate([global_src_id, global_dst_id, global_homo_nid]) assert ndata is not None
part_global_ids = global_ids[idx]
part_global_ids = part_global_ids[reshuffle_nodes]
ntype, per_type_ids = id_map(part_global_ids)
# continue with the graph creation sort_etypes = len(etypes_map) > 1
part_graph.ndata[dgl.NTYPE] = th.as_tensor( indptr, indices, csc_edge_ids = _process_partition_gb(
ntype, dtype=RESERVED_FIELD_DTYPE[dgl.NTYPE] ndata,
edata,
type_per_edge,
part_local_src_id,
part_local_dst_id,
sort_etypes,
)
edge_attr, node_attr = remove_attr_gb(
edge_attr=edata, node_attr=ndata, **kwargs
)
edge_attr = {
attr: edge_attr[attr][csc_edge_ids] for attr in edge_attr.keys()
}
cast_various_to_minimum_dtype_gb(
node_count=node_count,
edge_count=edge_count,
tot_node_count=tot_node_count,
tot_edge_count=tot_edge_count,
num_parts=num_parts,
indptr=indptr,
indices=indices,
type_per_edge=type_per_edge,
etypes=etypes,
ntypes=ntypes,
node_attributes=node_attr,
edge_attributes=edge_attr,
)
part_graph = gb.fused_csc_sampling_graph(
csc_indptr=indptr,
indices=indices,
node_type_offset=None,
type_per_edge=type_per_edge[csc_edge_ids],
node_attributes=node_attr,
edge_attributes=edge_attr,
node_type_to_id=ntypes_map,
edge_type_to_id=edge_type_to_id,
)
else:
num_edges = len(part_local_dst_id)
part_graph = dgl.graph(
data=(part_local_src_id, part_local_dst_id), num_nodes=len(uniq_ids)
)
# create edge data in graph.
(
part_graph.edata[dgl.EID],
part_graph.edata[dgl.ETYPE],
part_graph.edata["inner_edge"],
) = _create_edge_data(edgeid_offset, etype_ids, num_edges)
ndata, per_type_ids = _create_node_attr(
idx,
global_src_id,
global_dst_id,
global_homo_nid,
uniq_ids,
reshuffle_nodes,
id_map,
inner_nodes,
)
for attr_name, node_attributes in ndata.items():
part_graph.ndata[attr_name] = node_attributes
type_per_edge = part_graph.edata[dgl.ETYPE]
ndata, edata = part_graph.ndata, part_graph.edata
# get the original node ids and edge ids from original graph.
orig_nids, orig_eids = _graph_orig_ids(
return_orig_nids,
return_orig_eids,
ntypes_map,
etypes_map,
ndata,
edata,
per_type_ids,
type_per_edge,
global_edge_id,
) )
part_graph.ndata[dgl.NID] = th.as_tensor(uniq_ids[reshuffle_nodes])
part_graph.ndata["inner_node"] = th.as_tensor(
inner_nodes[reshuffle_nodes], dtype=RESERVED_FIELD_DTYPE["inner_node"]
)
orig_nids = None
orig_eids = None
if return_orig_nids:
orig_nids = {}
for ntype, ntype_id in ntypes_map.items():
mask = th.logical_and(
part_graph.ndata[dgl.NTYPE] == ntype_id,
part_graph.ndata["inner_node"],
)
orig_nids[ntype] = th.as_tensor(per_type_ids[mask])
if return_orig_eids:
orig_eids = {}
for etype, etype_id in etypes_map.items():
mask = th.logical_and(
part_graph.edata[dgl.ETYPE] == etype_id,
part_graph.edata["inner_edge"],
)
orig_eids[_etype_tuple_to_str(etype)] = th.as_tensor(
global_edge_id[mask]
)
return ( return (
part_graph, part_graph,
node_map_val, node_map_val,
@@ -523,6 +809,7 @@ def create_metadata_json(
ntypes_map, ntypes_map,
etypes_map, etypes_map,
output_dir, output_dir,
use_graphbolt,
): ):
""" """
Auxiliary function to create json file for the graph partition metadata Auxiliary function to create json file for the graph partition metadata
@@ -549,6 +836,8 @@ def create_metadata_json(
map between edge type(string) and edge_type_id(int) map between edge type(string) and edge_type_id(int)
output_dir : string output_dir : string
directory where the output files are to be stored directory where the output files are to be stored
use_graphbolt : bool
whether to use graphbolt or not
Returns: Returns:
-------- --------
@@ -572,10 +861,14 @@ def create_metadata_json(
part_dir = "part" + str(part_id) part_dir = "part" + str(part_id)
node_feat_file = os.path.join(part_dir, "node_feat.dgl") node_feat_file = os.path.join(part_dir, "node_feat.dgl")
edge_feat_file = os.path.join(part_dir, "edge_feat.dgl") edge_feat_file = os.path.join(part_dir, "edge_feat.dgl")
part_graph_file = os.path.join(part_dir, "graph.dgl") if use_graphbolt:
part_graph_file = os.path.join(part_dir, "fused_csc_sampling_graph.pt")
else:
part_graph_file = os.path.join(part_dir, "graph.dgl")
part_graph_type = "part_graph_graphbolt" if use_graphbolt else "part_graph"
part_metadata["part-{}".format(part_id)] = { part_metadata["part-{}".format(part_id)] = {
"node_feats": node_feat_file, "node_feats": node_feat_file,
"edge_feats": edge_feat_file, "edge_feats": edge_feat_file,
"part_graph": part_graph_file, part_graph_type: part_graph_file,
} }
return part_metadata return part_metadata

View File

@@ -94,6 +94,30 @@ if __name__ == "__main__":
action="store_true", action="store_true",
help="Save original edge IDs into files", help="Save original edge IDs into files",
) )
parser.add_argument(
"--use-graphbolt",
action="store_true",
help="Use GraphBolt for distributed partition.",
)
parser.add_argument(
"--store-inner-node",
action="store_true",
default=False,
help="Store inner nodes.",
)
parser.add_argument(
"--store-inner-edge",
action="store_true",
default=False,
help="Store inner edges.",
)
parser.add_argument(
"--store-eids",
action="store_true",
default=False,
help="Store edge IDs.",
)
parser.add_argument( parser.add_argument(
"--graph-formats", "--graph-formats",
default=None, default=None,
@@ -101,7 +125,6 @@ if __name__ == "__main__":
help="Save partitions in specified formats.", help="Save partitions in specified formats.",
) )
params = parser.parse_args() params = parser.parse_args()
# invoke the pipeline function # invoke the pipeline function
numeric_level = getattr(logging, params.log_level.upper(), None) numeric_level = getattr(logging, params.log_level.upper(), None)
logging.basicConfig( logging.basicConfig(

View File

@@ -13,7 +13,7 @@ import numpy as np
import torch import torch
import torch.distributed as dist import torch.distributed as dist
import torch.multiprocessing as mp import torch.multiprocessing as mp
from convert_partition import create_dgl_object, create_metadata_json from convert_partition import create_graph_object, create_metadata_json
from dataset_utils import get_dataset from dataset_utils import get_dataset
from dist_lookup import DistLookupService from dist_lookup import DistLookupService
from globalids import ( from globalids import (
@@ -1121,7 +1121,6 @@ def gen_dist_partitions(rank, world_size, params):
) )
id_map = dgl.distributed.id_map.IdMap(global_nid_ranges) id_map = dgl.distributed.id_map.IdMap(global_nid_ranges)
id_lookup.set_idMap(id_map) id_lookup.set_idMap(id_map)
# read input graph files and augment these datastructures with # read input graph files and augment these datastructures with
# appropriate information (global_nid and owner process) for node and edge data # appropriate information (global_nid and owner process) for node and edge data
( (
@@ -1315,6 +1314,8 @@ def gen_dist_partitions(rank, world_size, params):
) )
local_node_data = prepare_local_data(node_data, local_part_id) local_node_data = prepare_local_data(node_data, local_part_id)
local_edge_data = prepare_local_data(edge_data, local_part_id) local_edge_data = prepare_local_data(edge_data, local_part_id)
tot_node_count = sum(schema_map["num_nodes_per_type"])
tot_edge_count = sum(schema_map["num_edges_per_type"])
( (
graph_obj, graph_obj,
ntypes_map_val, ntypes_map_val,
@@ -1323,7 +1324,12 @@ def gen_dist_partitions(rank, world_size, params):
etypes_map, etypes_map,
orig_nids, orig_nids,
orig_eids, orig_eids,
) = create_dgl_object( ) = create_graph_object(
tot_node_count,
tot_edge_count,
node_count,
edge_count,
params.num_parts,
schema_map, schema_map,
rank + local_part_id * world_size, rank + local_part_id * world_size,
local_node_data, local_node_data,
@@ -1334,8 +1340,12 @@ def gen_dist_partitions(rank, world_size, params):
schema_map[constants.STR_NUM_NODES_PER_TYPE], schema_map[constants.STR_NUM_NODES_PER_TYPE],
), ),
edge_typecounts, edge_typecounts,
params.save_orig_nids, return_orig_nids=params.save_orig_nids,
params.save_orig_eids, return_orig_eids=params.save_orig_eids,
use_graphbolt=params.use_graphbolt,
store_inner_node=params.store_inner_node,
store_inner_edge=params.store_inner_edge,
store_eids=params.store_eids,
) )
sort_etypes = len(etypes_map) > 1 sort_etypes = len(etypes_map) > 1
local_node_features = prepare_local_data( local_node_features = prepare_local_data(
@@ -1354,8 +1364,12 @@ def gen_dist_partitions(rank, world_size, params):
orig_eids, orig_eids,
graph_formats, graph_formats,
sort_etypes, sort_etypes,
params.use_graphbolt,
) )
memory_snapshot("DiskWriteDGLObjectsComplete: ", rank) if params.use_graphbolt:
memory_snapshot("DiskWriteGrapgboltObjectsComplete: ", rank)
else:
memory_snapshot("DiskWriteDGLObjectsComplete: ", rank)
# get the meta-data # get the meta-data
json_metadata = create_metadata_json( json_metadata = create_metadata_json(
@@ -1369,6 +1383,7 @@ def gen_dist_partitions(rank, world_size, params):
ntypes_map, ntypes_map,
etypes_map, etypes_map,
params.output, params.output,
params.use_graphbolt,
) )
output_meta_json[ output_meta_json[
"local-part-id-" + str(local_part_id * world_size + rank) "local-part-id-" + str(local_part_id * world_size + rank)

View File

@@ -504,6 +504,20 @@ def write_edge_features(edge_features, edge_file):
dgl.data.utils.save_tensors(edge_file, edge_features) dgl.data.utils.save_tensors(edge_file, edge_features)
def write_graph_graghbolt(graph_file, graph_obj):
"""
Utility function to serialize FusedCSCSamplingGraph
Parameters:
-----------
graph_obj : FusedCSCSamplingGraph
FusedCSCSamplingGraph, as created in convert_partition.py, which is to be serialized
graph_file : string
File name in which graph object is serialized
"""
torch.save(graph_obj, graph_file)
def write_graph_dgl(graph_file, graph_obj, formats, sort_etypes): def write_graph_dgl(graph_file, graph_obj, formats, sort_etypes):
""" """
Utility function to serialize graph dgl objects Utility function to serialize graph dgl objects
@@ -519,9 +533,23 @@ def write_graph_dgl(graph_file, graph_obj, formats, sort_etypes):
sort_etypes : bool sort_etypes : bool
Whether to sort etypes in csc/csr. Whether to sort etypes in csc/csr.
""" """
dgl.distributed.partition._save_graphs( dgl.distributed.partition.process_partitions(
graph_file, [graph_obj], formats, sort_etypes graph_obj, formats, sort_etypes
) )
dgl.save_graphs(graph_file, [graph_obj], formats=formats)
def _write_graph(
part_dir, graph_obj, formats=None, sort_etypes=None, use_graphbolt=False
):
if use_graphbolt:
write_graph_graghbolt(
os.path.join(part_dir, "fused_csc_sampling_graph.pt"), graph_obj
)
else:
write_graph_dgl(
os.path.join(part_dir, "graph.dgl"), graph_obj, formats, sort_etypes
)
def write_dgl_objects( def write_dgl_objects(
@@ -534,6 +562,7 @@ def write_dgl_objects(
orig_eids, orig_eids,
formats, formats,
sort_etypes, sort_etypes,
use_graphbolt,
): ):
""" """
Wrapper function to write graph, node/edge feature, original node/edge IDs. Wrapper function to write graph, node/edge feature, original node/edge IDs.
@@ -558,13 +587,18 @@ def write_dgl_objects(
Save graph in formats. Save graph in formats.
sort_etypes : bool sort_etypes : bool
Whether to sort etypes in csc/csr. Whether to sort etypes in csc/csr.
use_graphbolt : bool
Whether to use graphbolt or not.
""" """
part_dir = output_dir + "/part" + str(part_id) part_dir = output_dir + "/part" + str(part_id)
os.makedirs(part_dir, exist_ok=True) os.makedirs(part_dir, exist_ok=True)
write_graph_dgl( _write_graph(
os.path.join(part_dir, "graph.dgl"), graph_obj, formats, sort_etypes part_dir,
graph_obj,
formats=formats,
sort_etypes=sort_etypes,
use_graphbolt=use_graphbolt,
) )
if node_features != None: if node_features != None:
write_node_features( write_node_features(
node_features, os.path.join(part_dir, "node_feat.dgl") node_features, os.path.join(part_dir, "node_feat.dgl")