mirror of
https://github.com/dmlc/dgl.git
synced 2026-06-04 19:44:23 +08:00
[Sampling] Separate DistNodeDataLoader from NodeDataLoader (#3578)
* separate distdataloader from node/edgedataloader * oops * docs * change user guide * changes * revert * fix
This commit is contained in:
@@ -17,6 +17,8 @@ and an ``EdgeDataLoader`` for edge/link prediction task.
|
||||
.. autoclass:: NodeDataLoader
|
||||
.. autoclass:: EdgeDataLoader
|
||||
.. autoclass:: GraphDataLoader
|
||||
.. autoclass:: DistNodeDataLoader
|
||||
.. autoclass:: DistEdgeDataLoader
|
||||
|
||||
.. _api-dataloading-neighbor-sampling:
|
||||
|
||||
|
||||
@@ -202,20 +202,20 @@ DGL provides two levels of APIs for sampling nodes and edges to generate mini-ba
|
||||
(see the section of mini-batch training). The low-level APIs require users to write code
|
||||
to explicitly define how a layer of nodes are sampled (e.g., using :func:`dgl.sampling.sample_neighbors` ).
|
||||
The high-level sampling APIs implement a few popular sampling algorithms for node classification
|
||||
and link prediction tasks (e.g., :class:`~dgl.dataloading.pytorch.NodeDataloader` and
|
||||
:class:`~dgl.dataloading.pytorch.EdgeDataloader` ).
|
||||
and link prediction tasks (e.g., :class:`~dgl.dataloading.pytorch.NodeDataLoader` and
|
||||
:class:`~dgl.dataloading.pytorch.EdgeDataLoader` ).
|
||||
|
||||
The distributed sampling module follows the same design and provides two levels of sampling APIs.
|
||||
For the lower-level sampling API, it provides :func:`~dgl.distributed.sample_neighbors` for
|
||||
distributed neighborhood sampling on :class:`~dgl.distributed.DistGraph`. In addition, DGL provides
|
||||
a distributed Dataloader (:class:`~dgl.distributed.DistDataLoader` ) for distributed sampling.
|
||||
The distributed Dataloader has the same interface as Pytorch DataLoader except that users cannot
|
||||
a distributed DataLoader (:class:`~dgl.distributed.DistDataLoader` ) for distributed sampling.
|
||||
The distributed DataLoader has the same interface as Pytorch DataLoader except that users cannot
|
||||
specify the number of worker processes when creating a dataloader. The worker processes are created
|
||||
in :func:`dgl.distributed.initialize`.
|
||||
|
||||
**Note**: When running :func:`dgl.distributed.sample_neighbors` on :class:`~dgl.distributed.DistGraph`,
|
||||
the sampler cannot run in Pytorch Dataloader with multiple worker processes. The main reason is that
|
||||
Pytorch Dataloader creates new sampling worker processes in every epoch, which leads to creating and
|
||||
the sampler cannot run in Pytorch DataLoader with multiple worker processes. The main reason is that
|
||||
Pytorch DataLoader creates new sampling worker processes in every epoch, which leads to creating and
|
||||
destroying :class:`~dgl.distributed.DistGraph` objects many times.
|
||||
|
||||
When using the low-level API, the sampling code is similar to single-process sampling. The only
|
||||
@@ -240,17 +240,17 @@ difference is that users need to use :func:`dgl.distributed.sample_neighbors` an
|
||||
for batch in dataloader:
|
||||
...
|
||||
|
||||
The same high-level sampling APIs (:class:`~dgl.dataloading.pytorch.NodeDataloader` and
|
||||
:class:`~dgl.dataloading.pytorch.EdgeDataloader` ) work for both :class:`~dgl.DGLGraph`
|
||||
and :class:`~dgl.distributed.DistGraph`. When using :class:`~dgl.dataloading.pytorch.NodeDataloader`
|
||||
and :class:`~dgl.dataloading.pytorch.EdgeDataloader`, the distributed sampling code is exactly
|
||||
the same as single-process sampling.
|
||||
The high-level sampling APIs (:class:`~dgl.dataloading.pytorch.NodeDataLoader` and
|
||||
:class:`~dgl.dataloading.pytorch.EdgeDataLoader` ) has distributed counterparts
|
||||
(:class:`~dgl.dataloading.pytorch.DistNodeDataLoader` and
|
||||
:class:`~dgl.dataloading.pytorch.DistEdgeDataLoader`). The code is exactly the
|
||||
same as single-process sampling otherwise.
|
||||
|
||||
.. code:: python
|
||||
|
||||
sampler = dgl.sampling.MultiLayerNeighborSampler([10, 25])
|
||||
dataloader = dgl.sampling.NodeDataLoader(g, train_nid, sampler,
|
||||
batch_size=batch_size, shuffle=True)
|
||||
dataloader = dgl.sampling.DistNodeDataLoader(g, train_nid, sampler,
|
||||
batch_size=batch_size, shuffle=True)
|
||||
for batch in dataloader:
|
||||
...
|
||||
|
||||
|
||||
@@ -177,9 +177,9 @@ DGL提供了一个稀疏的Adagrad优化器 :class:`~dgl.distributed.SparseAdagr
|
||||
DGL提供了两个级别的API,用于对节点和边进行采样以生成小批次训练数据(请参阅小批次训练的章节)。
|
||||
底层API要求用户编写代码以明确定义如何对节点层进行采样(例如,使用 :func:`dgl.sampling.sample_neighbors` )。
|
||||
高层采样API为节点分类和链接预测任务实现了一些流行的采样算法(例如
|
||||
:class:`~dgl.dataloading.pytorch.NodeDataloader`
|
||||
:class:`~dgl.dataloading.pytorch.NodeDataLoader`
|
||||
和
|
||||
:class:`~dgl.dataloading.pytorch.EdgeDataloader` )。
|
||||
:class:`~dgl.dataloading.pytorch.EdgeDataLoader` )。
|
||||
|
||||
分布式采样模块遵循相同的设计,也提供两个级别的采样API。对于底层的采样API,它为
|
||||
:class:`~dgl.distributed.DistGraph` 上的分布式邻居采样提供了
|
||||
@@ -188,7 +188,7 @@ DGL提供了两个级别的API,用于对节点和边进行采样以生成小
|
||||
分布式数据加载器具有与PyTorch DataLoader相同的接口。其中的工作进程(worker)在 :func:`dgl.distributed.initialize` 中创建。
|
||||
|
||||
**Note**: 在 :class:`~dgl.distributed.DistGraph` 上运行 :func:`dgl.distributed.sample_neighbors` 时,
|
||||
采样器无法在具有多个工作进程的PyTorch Dataloader中运行。主要原因是PyTorch Dataloader在每个训练周期都会创建新的采样工作进程,
|
||||
采样器无法在具有多个工作进程的PyTorch DataLoader中运行。主要原因是PyTorch DataLoader在每个训练周期都会创建新的采样工作进程,
|
||||
从而导致多次创建和删除 :class:`~dgl.distributed.DistGraph` 对象。
|
||||
|
||||
使用底层API时,采样代码类似于单进程采样。唯一的区别是用户需要使用
|
||||
@@ -214,19 +214,19 @@ DGL提供了两个级别的API,用于对节点和边进行采样以生成小
|
||||
for batch in dataloader:
|
||||
...
|
||||
|
||||
:class:`~dgl.DGLGraph` 和 :class:`~dgl.distributed.DistGraph` 都可以使用相同的高级采样API(
|
||||
:class:`~dgl.dataloading.pytorch.NodeDataloader`
|
||||
:class:`~dgl.dataloading.pytorch.NodeDataLoader`
|
||||
和
|
||||
:class:`~dgl.dataloading.pytorch.EdgeDataloader`)。使用
|
||||
:class:`~dgl.dataloading.pytorch.NodeDataloader`
|
||||
:class:`~dgl.dataloading.pytorch.EdgeDataLoader` 有分布式的版本
|
||||
:class:`~dgl.dataloading.pytorch.DistNodeDataLoader`
|
||||
和
|
||||
:class:`~dgl.dataloading.pytorch.EdgeDataloader` 时,分布式采样代码与单进程采样完全相同。
|
||||
:class:`~dgl.dataloading.pytorch.DistEdgeDataLoader` 。使用
|
||||
时分布式采样代码与单进程采样几乎完全相同。
|
||||
|
||||
.. code:: python
|
||||
|
||||
sampler = dgl.sampling.MultiLayerNeighborSampler([10, 25])
|
||||
dataloader = dgl.sampling.NodeDataLoader(g, train_nid, sampler,
|
||||
batch_size=batch_size, shuffle=True)
|
||||
dataloader = dgl.sampling.DistNodeDataLoader(g, train_nid, sampler,
|
||||
batch_size=batch_size, shuffle=True)
|
||||
for batch in dataloader:
|
||||
...
|
||||
|
||||
|
||||
@@ -132,8 +132,8 @@ DGL은 노드 임베딩들을 필요로 하는 변환 모델(transductive models
|
||||
분산 샘플링
|
||||
~~~~~~~~
|
||||
|
||||
DGL은 미니-배치를 생성하기 위해 노드 및 에지 샘플링을 하는 두 수준의 API를 제공한다 (미니-배치 학습 섹션 참조). Low-level API는 노드들의 레이어가 어떻게 샘플링될지를 명시적으로 정의하는 코드를 직접 작성해야한다 (예를 들면, :func:`dgl.sampling.sample_neighbors` 사용해서). High-level API는 노드 분류 및 링크 예측(예, :class:`~dgl.dataloading.pytorch.NodeDataloader` 와
|
||||
:class:`~dgl.dataloading.pytorch.EdgeDataloader`) 에 사용되는 몇 가지 유명한 샘플링 알고리즘을 구현하고 있다.
|
||||
DGL은 미니-배치를 생성하기 위해 노드 및 에지 샘플링을 하는 두 수준의 API를 제공한다 (미니-배치 학습 섹션 참조). Low-level API는 노드들의 레이어가 어떻게 샘플링될지를 명시적으로 정의하는 코드를 직접 작성해야한다 (예를 들면, :func:`dgl.sampling.sample_neighbors` 사용해서). High-level API는 노드 분류 및 링크 예측(예, :class:`~dgl.dataloading.pytorch.NodeDataLoader` 와
|
||||
:class:`~dgl.dataloading.pytorch.EdgeDataLoader`) 에 사용되는 몇 가지 유명한 샘플링 알고리즘을 구현하고 있다.
|
||||
|
||||
분산 샘플링 모듈도 같은 디자인을 따르고 있고, 두 level의 샘플링 API를 제공한다. Low-level 샘플링 API의 경우, :class:`~dgl.distributed.DistGraph` 에 대한 분산 이웃 샘플링을 위해 :func:`~dgl.distributed.sample_neighbors` 가 있다. 또한, DGL은 분산 샘플링을 위해 분산 데이터 로더, :class:`~dgl.distributed.DistDataLoader` 를 제공한다. 분산 DataLoader는 PyTorch DataLoader와 같은 인터페이스를 갖는데, 다른 점은 사용자가 데이터 로더를 생성할 때 worker 프로세스의 개수를 지정할 수 없다는 점이다. Worker 프로세스들은 :func:`dgl.distributed.initialize` 에서 만들어진다.
|
||||
|
||||
@@ -159,13 +159,13 @@ Low-level API를 사용할 때, 샘플링 코드는 단일 프로세스 샘플
|
||||
for batch in dataloader:
|
||||
...
|
||||
|
||||
동일한 high-level 샘플링 API들(:class:`~dgl.dataloading.pytorch.NodeDataloader` 와 :class:`~dgl.dataloading.pytorch.EdgeDataloader` )이 :class:`~dgl.DGLGraph` 와 :class:`~dgl.distributed.DistGraph` 에 대해서 동작한다. :class:`~dgl.dataloading.pytorch.NodeDataloader` 과 :class:`~dgl.dataloading.pytorch.EdgeDataloader` 를 사용할 때, 분산 샘플링 코드는 싱글-프로세스 샘플링 코드와 정확하게 같다.
|
||||
동일한 high-level 샘플링 API들(:class:`~dgl.dataloading.pytorch.NodeDataLoader` 와 :class:`~dgl.dataloading.pytorch.EdgeDataLoader` )이 :class:`~dgl.DGLGraph` 와 :class:`~dgl.distributed.DistGraph` 에 대해서 동작한다. :class:`~dgl.dataloading.pytorch.NodeDataLoader` 과 :class:`~dgl.dataloading.pytorch.EdgeDataLoader` 를 사용할 때, 분산 샘플링 코드는 싱글-프로세스 샘플링 코드와 정확하게 같다.
|
||||
|
||||
.. code:: python
|
||||
|
||||
sampler = dgl.sampling.MultiLayerNeighborSampler([10, 25])
|
||||
dataloader = dgl.sampling.NodeDataLoader(g, train_nid, sampler,
|
||||
batch_size=batch_size, shuffle=True)
|
||||
dataloader = dgl.sampling.DistNodeDataLoader(g, train_nid, sampler,
|
||||
batch_size=batch_size, shuffle=True)
|
||||
for batch in dataloader:
|
||||
...
|
||||
|
||||
|
||||
@@ -69,7 +69,7 @@ class SAGE(nn.Module):
|
||||
y = th.zeros(g.number_of_nodes(), self.n_hidden if l != len(self.layers) - 1 else self.n_classes)
|
||||
|
||||
sampler = dgl.dataloading.MultiLayerNeighborSampler([None])
|
||||
dataloader = dgl.dataloading.NodeDataLoader(
|
||||
dataloader = dgl.dataloading.DistNodeDataLoader(
|
||||
g,
|
||||
th.arange(g.number_of_nodes()),
|
||||
sampler,
|
||||
|
||||
@@ -366,7 +366,7 @@ def run(args, device, data):
|
||||
val_fanouts = [int(fanout) for fanout in args.validation_fanout.split(',')]
|
||||
|
||||
sampler = dgl.dataloading.MultiLayerNeighborSampler(fanouts)
|
||||
dataloader = dgl.dataloading.NodeDataLoader(
|
||||
dataloader = dgl.dataloading.DistNodeDataLoader(
|
||||
g,
|
||||
{'paper': train_nid},
|
||||
sampler,
|
||||
@@ -375,7 +375,7 @@ def run(args, device, data):
|
||||
drop_last=False)
|
||||
|
||||
valid_sampler = dgl.dataloading.MultiLayerNeighborSampler(val_fanouts)
|
||||
valid_dataloader = dgl.dataloading.NodeDataLoader(
|
||||
valid_dataloader = dgl.dataloading.DistNodeDataLoader(
|
||||
g,
|
||||
{'paper': val_nid},
|
||||
valid_sampler,
|
||||
@@ -384,7 +384,7 @@ def run(args, device, data):
|
||||
drop_last=False)
|
||||
|
||||
test_sampler = dgl.dataloading.MultiLayerNeighborSampler(val_fanouts)
|
||||
test_dataloader = dgl.dataloading.NodeDataLoader(
|
||||
test_dataloader = dgl.dataloading.DistNodeDataLoader(
|
||||
g,
|
||||
{'paper': test_nid},
|
||||
test_sampler,
|
||||
|
||||
@@ -287,6 +287,9 @@ class TemporalEdgeDataLoader(dgl.dataloading.EdgeDataLoader):
|
||||
if dataloader_kwargs.get('num_workers', 0) > 0:
|
||||
g.create_formats_()
|
||||
|
||||
def __iter__(self):
|
||||
return iter(self.dataloader)
|
||||
|
||||
# ====== Fast Mode ======
|
||||
|
||||
# Part of code in reservoir sampling comes from PyG library
|
||||
|
||||
@@ -292,7 +292,7 @@ if __name__ == "__main__":
|
||||
if i < args.epochs-1 and args.fast_mode:
|
||||
sampler.reset()
|
||||
print(log_content[0], log_content[1], log_content[2])
|
||||
except:
|
||||
except KeyboardInterrupt:
|
||||
traceback.print_exc()
|
||||
error_content = "Training Interreputed!"
|
||||
f.writelines(error_content)
|
||||
|
||||
@@ -1,2 +1,3 @@
|
||||
"""DGL PyTorch DataLoader module."""
|
||||
from .dataloader import *
|
||||
from .dist_dataloader import *
|
||||
|
||||
@@ -6,9 +6,8 @@ import torch as th
|
||||
from torch.utils.data import DataLoader, IterableDataset
|
||||
from torch.utils.data.distributed import DistributedSampler
|
||||
import torch.distributed as dist
|
||||
from ..dataloader import NodeCollator, EdgeCollator, GraphCollator, SubgraphIterator, BlockSampler
|
||||
from ...distributed import DistGraph
|
||||
from ...distributed import DistDataLoader
|
||||
from ..dataloader import NodeCollator, EdgeCollator, GraphCollator, SubgraphIterator, BlockSampler
|
||||
from ...ndarray import NDArray as DGLNDArray
|
||||
from ... import backend as F
|
||||
from ...base import DGLError
|
||||
@@ -24,6 +23,10 @@ PYTORCH_VER = LooseVersion(th.__version__)
|
||||
PYTORCH_16 = PYTORCH_VER >= LooseVersion("1.6.0")
|
||||
PYTORCH_17 = PYTORCH_VER >= LooseVersion("1.7.0")
|
||||
|
||||
def _check_graph_type(g):
|
||||
if isinstance(g, DistGraph):
|
||||
raise TypeError("Please use DistNodeDataLoader or DistEdgeDataLoader for DistGraph")
|
||||
|
||||
def _create_dist_sampler(dataset, dataloader_kwargs, ddp_seed):
|
||||
# Note: will change the content of dataloader_kwargs
|
||||
dist_sampler_kwargs = {'shuffle': dataloader_kwargs['shuffle']}
|
||||
@@ -164,14 +167,6 @@ class _ScalarDataBatcher(th.utils.data.IterableDataset):
|
||||
"""Set epoch number for distributed training."""
|
||||
self.epoch = epoch
|
||||
|
||||
def _remove_kwargs_dist(kwargs):
|
||||
if 'num_workers' in kwargs:
|
||||
del kwargs['num_workers']
|
||||
if 'pin_memory' in kwargs:
|
||||
del kwargs['pin_memory']
|
||||
print('Distributed DataLoader does not support pin_memory')
|
||||
return kwargs
|
||||
|
||||
# The following code is a fix to the PyTorch-specific issue in
|
||||
# https://github.com/dmlc/dgl/issues/2137
|
||||
#
|
||||
@@ -288,14 +283,14 @@ def _restore_storages(subgs, g):
|
||||
_restore_subgraph_storage(subg, g)
|
||||
|
||||
class _NodeCollator(NodeCollator):
|
||||
def collate(self, items):
|
||||
def collate(self, items): # pylint: disable=missing-docstring
|
||||
# input_nodes, output_nodes, blocks
|
||||
result = super().collate(items)
|
||||
_pop_storages(result[-1], self.g)
|
||||
return result
|
||||
|
||||
class _EdgeCollator(EdgeCollator):
|
||||
def collate(self, items):
|
||||
def collate(self, items): # pylint: disable=missing-docstring
|
||||
if self.negative_sampler is None:
|
||||
# input_nodes, pair_graph, blocks
|
||||
result = super().collate(items)
|
||||
@@ -325,10 +320,10 @@ def _to_device(data, device):
|
||||
return recursive_apply(data, lambda x: x.to(device))
|
||||
|
||||
class _NodeDataLoaderIter:
|
||||
def __init__(self, node_dataloader):
|
||||
def __init__(self, node_dataloader, iter_):
|
||||
self.device = node_dataloader.device
|
||||
self.node_dataloader = node_dataloader
|
||||
self.iter_ = iter(node_dataloader.dataloader)
|
||||
self.iter_ = iter_
|
||||
self.async_load = node_dataloader.async_load
|
||||
if self.async_load:
|
||||
if F.device_type(self.device) != 'cuda':
|
||||
@@ -350,10 +345,10 @@ class _NodeDataLoaderIter:
|
||||
return result
|
||||
|
||||
class _EdgeDataLoaderIter:
|
||||
def __init__(self, edge_dataloader):
|
||||
def __init__(self, edge_dataloader, iter_):
|
||||
self.device = edge_dataloader.device
|
||||
self.edge_dataloader = edge_dataloader
|
||||
self.iter_ = iter(edge_dataloader.dataloader)
|
||||
self.iter_ = iter_
|
||||
|
||||
# Make this an iterator for PyTorch Lightning compatibility
|
||||
def __iter__(self):
|
||||
@@ -373,9 +368,9 @@ class _EdgeDataLoaderIter:
|
||||
return result
|
||||
|
||||
class _GraphDataLoaderIter:
|
||||
def __init__(self, graph_dataloader):
|
||||
def __init__(self, graph_dataloader, iter_):
|
||||
self.dataloader = graph_dataloader
|
||||
self.iter_ = iter(graph_dataloader.dataloader)
|
||||
self.iter_ = iter_
|
||||
|
||||
def __iter__(self):
|
||||
return self
|
||||
@@ -422,14 +417,9 @@ def _init_dataloader(collator, device, dataloader_kwargs, use_ddp, ddp_seed):
|
||||
else:
|
||||
dist_sampler = None
|
||||
|
||||
dataloader = DataLoader(
|
||||
dataset,
|
||||
collate_fn=collator.collate,
|
||||
**dataloader_kwargs)
|
||||
return use_scalar_batcher, scalar_batcher, dataset, collator, dist_sampler
|
||||
|
||||
return use_scalar_batcher, scalar_batcher, dataloader, dist_sampler
|
||||
|
||||
class NodeDataLoader:
|
||||
class NodeDataLoader(DataLoader):
|
||||
"""PyTorch dataloader for batch-iterating over a set of nodes, generating the list
|
||||
of message flow graphs (MFGs) as computation dependency of the said minibatch.
|
||||
|
||||
@@ -555,10 +545,10 @@ class NodeDataLoader:
|
||||
single-GPU and the whole graph does not fit in GPU memory.
|
||||
"""
|
||||
collator_arglist = inspect.getfullargspec(NodeCollator).args
|
||||
|
||||
def __init__(self, g, nids, graph_sampler, device=None, use_ddp=False, ddp_seed=0,
|
||||
load_input=None, load_output=None, load_ndata=None, load_edata=None,
|
||||
async_load=False, **kwargs):
|
||||
_check_graph_type(g)
|
||||
collator_kwargs = {}
|
||||
dataloader_kwargs = {}
|
||||
for k, v in kwargs.items():
|
||||
@@ -567,75 +557,54 @@ class NodeDataLoader:
|
||||
else:
|
||||
dataloader_kwargs[k] = v
|
||||
|
||||
if isinstance(g, DistGraph):
|
||||
if device is None:
|
||||
# for the distributed case default to the CPU
|
||||
device = 'cpu'
|
||||
assert device == 'cpu', 'Only cpu is supported in the case of a DistGraph.'
|
||||
# Distributed DataLoader currently does not support heterogeneous graphs
|
||||
# and does not copy features. Fallback to normal solution
|
||||
self.collator = NodeCollator(g, nids, graph_sampler, **collator_kwargs)
|
||||
_remove_kwargs_dist(dataloader_kwargs)
|
||||
self.dataloader = DistDataLoader(self.collator.dataset,
|
||||
collate_fn=self.collator.collate,
|
||||
**dataloader_kwargs)
|
||||
self.is_distributed = True
|
||||
if device is None:
|
||||
# default to the same device the graph is on
|
||||
device = th.device(g.device)
|
||||
|
||||
self.async_load = async_load
|
||||
self.load_input = {} if load_input is None else load_input
|
||||
self.load_output = {} if load_output is None else load_output
|
||||
self.load_ndata = {} if load_ndata is None else load_ndata
|
||||
# TODO(BarclayII): The samplers may not always return edge IDs due to
|
||||
# efficiency concerns (see the comment in BlockSampler.__init__()).
|
||||
# However, the async copy wrapper relies on EID in the returned subgraph
|
||||
# to fetch edge features. Therefore to make edge feature fetching work,
|
||||
# return_eids must be True. If we have a graph-level lazy index implemented
|
||||
# we can potentially relax this constraint.
|
||||
if (async_load and load_edata is not None and
|
||||
not getattr(graph_sampler, "return_eids", True)):
|
||||
raise DGLError("sampler's return_eids must be True if load_edata is not None")
|
||||
self.load_edata = {} if load_edata is None else load_edata
|
||||
|
||||
# if the sampler supports it, tell it to output to the specified device.
|
||||
# But if async_load is enabled, set_output_context should be skipped as
|
||||
# we'd like to avoid any graph/data transfer graphs across devices in
|
||||
# sampler. Such transfer will be handled in dataloader.
|
||||
num_workers = dataloader_kwargs.get('num_workers', 0)
|
||||
if ((not async_load) and
|
||||
callable(getattr(graph_sampler, "set_output_context", None)) and
|
||||
num_workers == 0):
|
||||
graph_sampler.set_output_context(to_dgl_context(device))
|
||||
else:
|
||||
if device is None:
|
||||
# default to the same device the graph is on
|
||||
device = th.device(g.device)
|
||||
graph_sampler.set_output_context(to_dgl_context(th.device('cpu')))
|
||||
|
||||
self.async_load = async_load
|
||||
self.load_input = {} if load_input is None else load_input
|
||||
self.load_output = {} if load_output is None else load_output
|
||||
self.load_ndata = {} if load_ndata is None else load_ndata
|
||||
# TODO(BarclayII): The samplers may not always return edge IDs due to
|
||||
# efficiency concerns (see the comment in BlockSampler.__init__()).
|
||||
# However, the async copy wrapper relies on EID in the returned subgraph
|
||||
# to fetch edge features. Therefore to make edge feature fetching work,
|
||||
# return_eids must be True. If we have a graph-level lazy index implemented
|
||||
# we can potentially relax this constraint.
|
||||
if (async_load and load_edata is not None and
|
||||
not getattr(graph_sampler, "return_eids", True)):
|
||||
raise DGLError("sampler's return_eids must be True if load_edata is not None")
|
||||
self.load_edata = {} if load_edata is None else load_edata
|
||||
self.collator = _NodeCollator(g, nids, graph_sampler, **collator_kwargs)
|
||||
self.use_scalar_batcher, self.scalar_batcher, dataset, collator, self.dist_sampler = \
|
||||
_init_dataloader(self.collator, device, dataloader_kwargs, use_ddp, ddp_seed)
|
||||
super().__init__(dataset, collate_fn=collator.collate, **dataloader_kwargs)
|
||||
|
||||
# if the sampler supports it, tell it to output to the specified device.
|
||||
# But if async_load is enabled, set_output_context should be skipped as
|
||||
# we'd like to avoid any graph/data transfer graphs across devices in
|
||||
# sampler. Such transfer will be handled in dataloader.
|
||||
num_workers = dataloader_kwargs.get('num_workers', 0)
|
||||
if ((not async_load) and
|
||||
callable(getattr(graph_sampler, "set_output_context", None)) and
|
||||
num_workers == 0):
|
||||
graph_sampler.set_output_context(to_dgl_context(device))
|
||||
else:
|
||||
graph_sampler.set_output_context(to_dgl_context(th.device('cpu')))
|
||||
self.use_ddp = use_ddp
|
||||
self.is_distributed = False
|
||||
|
||||
self.collator = _NodeCollator(g, nids, graph_sampler, **collator_kwargs)
|
||||
self.use_scalar_batcher, self.scalar_batcher, self.dataloader, self.dist_sampler = \
|
||||
_init_dataloader(self.collator, device, dataloader_kwargs, use_ddp, ddp_seed)
|
||||
|
||||
self.use_ddp = use_ddp
|
||||
self.is_distributed = False
|
||||
|
||||
# Precompute the CSR and CSC representations so each subprocess does not
|
||||
# duplicate.
|
||||
if num_workers > 0:
|
||||
g.create_formats_()
|
||||
# Precompute the CSR and CSC representations so each subprocess does not
|
||||
# duplicate.
|
||||
if num_workers > 0:
|
||||
g.create_formats_()
|
||||
self.device = device
|
||||
|
||||
def __iter__(self):
|
||||
"""Return the iterator of the data loader."""
|
||||
if self.is_distributed:
|
||||
# Directly use the iterator of DistDataLoader, which doesn't copy features anyway.
|
||||
return iter(self.dataloader)
|
||||
else:
|
||||
return _NodeDataLoaderIter(self)
|
||||
|
||||
def __len__(self):
|
||||
"""Return the number of batches of the data loader."""
|
||||
return len(self.dataloader)
|
||||
return _NodeDataLoaderIter(self, super().__iter__())
|
||||
|
||||
def set_epoch(self, epoch):
|
||||
"""Sets the epoch number for the underlying sampler which ensures all replicas
|
||||
@@ -658,7 +627,7 @@ class NodeDataLoader:
|
||||
else:
|
||||
raise DGLError('set_epoch is only available when use_ddp is True.')
|
||||
|
||||
class EdgeDataLoader:
|
||||
class EdgeDataLoader(DataLoader):
|
||||
"""PyTorch dataloader for batch-iterating over a set of edges, generating the list
|
||||
of message flow graphs (MFGs) as computation dependency of the said minibatch for
|
||||
edge classification, edge regression, and link prediction.
|
||||
@@ -866,8 +835,9 @@ class EdgeDataLoader:
|
||||
* Link prediction on heterogeneous graph: RGCN for link prediction.
|
||||
"""
|
||||
collator_arglist = inspect.getfullargspec(EdgeCollator).args
|
||||
|
||||
def __init__(self, g, eids, graph_sampler, device='cpu', use_ddp=False, ddp_seed=0, **kwargs):
|
||||
def __init__(self, g, eids, graph_sampler, device='cpu', use_ddp=False, ddp_seed=0,
|
||||
**kwargs):
|
||||
_check_graph_type(g)
|
||||
collator_kwargs = {}
|
||||
dataloader_kwargs = {}
|
||||
for k, v in kwargs.items():
|
||||
@@ -876,53 +846,30 @@ class EdgeDataLoader:
|
||||
else:
|
||||
dataloader_kwargs[k] = v
|
||||
|
||||
if isinstance(g, DistGraph):
|
||||
if device is None:
|
||||
# for the distributed case default to the CPU
|
||||
device = 'cpu'
|
||||
assert device == 'cpu', 'Only cpu is supported in the case of a DistGraph.'
|
||||
# Distributed DataLoader currently does not support heterogeneous graphs
|
||||
# and does not copy features. Fallback to normal solution
|
||||
self.collator = EdgeCollator(g, eids, graph_sampler, **collator_kwargs)
|
||||
_remove_kwargs_dist(dataloader_kwargs)
|
||||
self.dataloader = DistDataLoader(self.collator.dataset,
|
||||
collate_fn=self.collator.collate,
|
||||
**dataloader_kwargs)
|
||||
self.is_distributed = True
|
||||
else:
|
||||
if device is None:
|
||||
# default to the same device the graph is on
|
||||
device = th.device(g.device)
|
||||
|
||||
# if the sampler supports it, tell it to output to the
|
||||
# specified device
|
||||
num_workers = dataloader_kwargs.get('num_workers', 0)
|
||||
if callable(getattr(graph_sampler, "set_output_context", None)) and num_workers == 0:
|
||||
graph_sampler.set_output_context(to_dgl_context(device))
|
||||
# if the sampler supports it, tell it to output to the
|
||||
# specified device
|
||||
num_workers = dataloader_kwargs.get('num_workers', 0)
|
||||
if callable(getattr(graph_sampler, "set_output_context", None)) and num_workers == 0:
|
||||
graph_sampler.set_output_context(to_dgl_context(device))
|
||||
|
||||
self.collator = _EdgeCollator(g, eids, graph_sampler, **collator_kwargs)
|
||||
self.use_scalar_batcher, self.scalar_batcher, self.dataloader, self.dist_sampler = \
|
||||
_init_dataloader(self.collator, device, dataloader_kwargs, use_ddp, ddp_seed)
|
||||
self.use_ddp = use_ddp
|
||||
self.is_distributed = False
|
||||
self.collator = EdgeCollator(g, eids, graph_sampler, **collator_kwargs)
|
||||
self.use_scalar_batcher, self.scalar_batcher, dataset, collator, self.dist_sampler = \
|
||||
_init_dataloader(self.collator, device, dataloader_kwargs, use_ddp, ddp_seed)
|
||||
self.use_ddp = use_ddp
|
||||
super().__init__(dataset, collate_fn=collator.collate, **dataloader_kwargs)
|
||||
|
||||
# Precompute the CSR and CSC representations so each subprocess does not duplicate.
|
||||
if num_workers > 0:
|
||||
g.create_formats_()
|
||||
# Precompute the CSR and CSC representations so each subprocess does not duplicate.
|
||||
if num_workers > 0:
|
||||
g.create_formats_()
|
||||
|
||||
self.device = device
|
||||
|
||||
def __iter__(self):
|
||||
"""Return the iterator of the data loader."""
|
||||
if self.is_distributed:
|
||||
# Directly use the iterator of DistDataLoader, which doesn't copy features anyway.
|
||||
return iter(self.dataloader)
|
||||
else:
|
||||
return _EdgeDataLoaderIter(self)
|
||||
|
||||
def __len__(self):
|
||||
"""Return the number of batches of the data loader."""
|
||||
return len(self.dataloader)
|
||||
return _EdgeDataLoaderIter(self, super().__iter__())
|
||||
|
||||
def set_epoch(self, epoch):
|
||||
"""Sets the epoch number for the underlying sampler which ensures all replicas
|
||||
@@ -945,7 +892,7 @@ class EdgeDataLoader:
|
||||
else:
|
||||
raise DGLError('set_epoch is only available when use_ddp is True.')
|
||||
|
||||
class GraphDataLoader:
|
||||
class GraphDataLoader(DataLoader):
|
||||
"""PyTorch dataloader for batch-iterating over a set of graphs, generating the batched
|
||||
graph and corresponding label tensor (if provided) of the said minibatch.
|
||||
|
||||
@@ -992,7 +939,6 @@ class GraphDataLoader:
|
||||
... train_on(batched_graph, labels)
|
||||
"""
|
||||
collator_arglist = inspect.getfullargspec(GraphCollator).args
|
||||
|
||||
def __init__(self, dataset, collate_fn=None, use_ddp=False, ddp_seed=0, **kwargs):
|
||||
collator_kwargs = {}
|
||||
dataloader_kwargs = {}
|
||||
@@ -1027,14 +973,11 @@ class GraphDataLoader:
|
||||
if use_ddp:
|
||||
self.dist_sampler = _create_dist_sampler(dataset, dataloader_kwargs, ddp_seed)
|
||||
dataloader_kwargs['sampler'] = self.dist_sampler
|
||||
|
||||
self.dataloader = DataLoader(dataset=dataset,
|
||||
collate_fn=self.collate,
|
||||
**dataloader_kwargs)
|
||||
super().__init__(dataset, collate_fn=self.collate, **dataloader_kwargs)
|
||||
|
||||
def __iter__(self):
|
||||
"""Return the iterator of the data loader."""
|
||||
return _GraphDataLoaderIter(self)
|
||||
return _GraphDataLoaderIter(self, super().__iter__())
|
||||
|
||||
def __len__(self):
|
||||
"""Return the number of batches of the data loader."""
|
||||
|
||||
102
python/dgl/dataloading/pytorch/dist_dataloader.py
Normal file
102
python/dgl/dataloading/pytorch/dist_dataloader.py
Normal file
@@ -0,0 +1,102 @@
|
||||
"""Distributed dataloaders.
|
||||
"""
|
||||
import inspect
|
||||
from ...distributed import DistDataLoader
|
||||
from ..dataloader import NodeCollator, EdgeCollator
|
||||
|
||||
def _remove_kwargs_dist(kwargs):
|
||||
if 'num_workers' in kwargs:
|
||||
del kwargs['num_workers']
|
||||
if 'pin_memory' in kwargs:
|
||||
del kwargs['pin_memory']
|
||||
print('Distributed DataLoaders do not support pin_memory.')
|
||||
return kwargs
|
||||
|
||||
class DistNodeDataLoader(DistDataLoader):
|
||||
"""PyTorch dataloader for batch-iterating over a set of nodes, generating the list
|
||||
of message flow graphs (MFGs) as computation dependency of the said minibatch, on
|
||||
a distributed graph.
|
||||
|
||||
All the arguments have the same meaning as the single-machine counterpart
|
||||
:class:`dgl.dataloading.pytorch.NodeDataLoader` except the first argument
|
||||
:attr:`g` which must be a :class:`dgl.distributed.DistGraph`.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
g : DistGraph
|
||||
The distributed graph.
|
||||
|
||||
nids, graph_sampler, device, kwargs :
|
||||
See :class:`dgl.dataloading.pytorch.NodeDataLoader`.
|
||||
|
||||
See also
|
||||
--------
|
||||
dgl.dataloading.pytorch.NodeDataLoader
|
||||
"""
|
||||
def __init__(self, g, nids, graph_sampler, device=None, **kwargs):
|
||||
collator_kwargs = {}
|
||||
dataloader_kwargs = {}
|
||||
_collator_arglist = inspect.getfullargspec(NodeCollator).args
|
||||
for k, v in kwargs.items():
|
||||
if k in _collator_arglist:
|
||||
collator_kwargs[k] = v
|
||||
else:
|
||||
dataloader_kwargs[k] = v
|
||||
if device is None:
|
||||
# for the distributed case default to the CPU
|
||||
device = 'cpu'
|
||||
assert device == 'cpu', 'Only cpu is supported in the case of a DistGraph.'
|
||||
# Distributed DataLoader currently does not support heterogeneous graphs
|
||||
# and does not copy features. Fallback to normal solution
|
||||
self.collator = NodeCollator(g, nids, graph_sampler, **collator_kwargs)
|
||||
_remove_kwargs_dist(dataloader_kwargs)
|
||||
super().__init__(self.collator.dataset,
|
||||
collate_fn=self.collator.collate,
|
||||
**dataloader_kwargs)
|
||||
self.device = device
|
||||
|
||||
class DistEdgeDataLoader(DistDataLoader):
|
||||
"""PyTorch dataloader for batch-iterating over a set of edges, generating the list
|
||||
of message flow graphs (MFGs) as computation dependency of the said minibatch for
|
||||
edge classification, edge regression, and link prediction, on a distributed
|
||||
graph.
|
||||
|
||||
All the arguments have the same meaning as the single-machine counterpart
|
||||
:class:`dgl.dataloading.pytorch.EdgeDataLoader` except the first argument
|
||||
:attr:`g` which must be a :class:`dgl.distributed.DistGraph`.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
g : DistGraph
|
||||
The distributed graph.
|
||||
|
||||
eids, graph_sampler, device, kwargs :
|
||||
See :class:`dgl.dataloading.pytorch.EdgeDataLoader`.
|
||||
|
||||
See also
|
||||
--------
|
||||
dgl.dataloading.pytorch.EdgeDataLoader
|
||||
"""
|
||||
def __init__(self, g, eids, graph_sampler, device=None, **kwargs):
|
||||
collator_kwargs = {}
|
||||
dataloader_kwargs = {}
|
||||
_collator_arglist = inspect.getfullargspec(EdgeCollator).args
|
||||
for k, v in kwargs.items():
|
||||
if k in _collator_arglist:
|
||||
collator_kwargs[k] = v
|
||||
else:
|
||||
dataloader_kwargs[k] = v
|
||||
|
||||
if device is None:
|
||||
# for the distributed case default to the CPU
|
||||
device = 'cpu'
|
||||
assert device == 'cpu', 'Only cpu is supported in the case of a DistGraph.'
|
||||
# Distributed DataLoader currently does not support heterogeneous graphs
|
||||
# and does not copy features. Fallback to normal solution
|
||||
self.collator = EdgeCollator(g, eids, graph_sampler, **collator_kwargs)
|
||||
_remove_kwargs_dist(dataloader_kwargs)
|
||||
super().__init__(self.collator.dataset,
|
||||
collate_fn=self.collator.collate,
|
||||
**dataloader_kwargs)
|
||||
|
||||
self.device = device
|
||||
@@ -148,14 +148,14 @@ def start_dist_neg_dataloader(rank, tmpdir, num_server, num_workers, orig_nid, g
|
||||
num_negs = 5
|
||||
sampler = dgl.dataloading.MultiLayerNeighborSampler([5,10])
|
||||
negative_sampler=dgl.dataloading.negative_sampler.Uniform(num_negs)
|
||||
dataloader = dgl.dataloading.EdgeDataLoader(dist_graph,
|
||||
train_eid,
|
||||
sampler,
|
||||
batch_size=batch_size,
|
||||
negative_sampler=negative_sampler,
|
||||
shuffle=True,
|
||||
drop_last=False,
|
||||
num_workers=num_workers)
|
||||
dataloader = dgl.dataloading.DistEdgeDataLoader(dist_graph,
|
||||
train_eid,
|
||||
sampler,
|
||||
batch_size=batch_size,
|
||||
negative_sampler=negative_sampler,
|
||||
shuffle=True,
|
||||
drop_last=False,
|
||||
num_workers=num_workers)
|
||||
for _ in range(2):
|
||||
for _, (_, pos_graph, neg_graph, blocks) in zip(range(0, num_edges_to_sample, batch_size), dataloader):
|
||||
block = blocks[-1]
|
||||
@@ -283,7 +283,7 @@ def start_node_dataloader(rank, tmpdir, num_server, num_workers, orig_nid, orig_
|
||||
# We need to test creating DistDataLoader multiple times.
|
||||
for i in range(2):
|
||||
# Create DataLoader for constructing blocks
|
||||
dataloader = dgl.dataloading.NodeDataLoader(
|
||||
dataloader = dgl.dataloading.DistNodeDataLoader(
|
||||
dist_graph,
|
||||
train_nid,
|
||||
sampler,
|
||||
@@ -334,7 +334,7 @@ def start_edge_dataloader(rank, tmpdir, num_server, num_workers, orig_nid, orig_
|
||||
# We need to test creating DistDataLoader multiple times.
|
||||
for i in range(2):
|
||||
# Create DataLoader for constructing blocks
|
||||
dataloader = dgl.dataloading.EdgeDataLoader(
|
||||
dataloader = dgl.dataloading.DistEdgeDataLoader(
|
||||
dist_graph,
|
||||
train_eid,
|
||||
sampler,
|
||||
|
||||
9
tutorials/dist/1_node_classification.py
vendored
9
tutorials/dist/1_node_classification.py
vendored
@@ -265,7 +265,8 @@ Pytorch's `DistributedDataParallel`.
|
||||
Distributed mini-batch sampler
|
||||
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
|
||||
|
||||
We can use the same `NodeDataLoader` to create a distributed mini-batch sampler for
|
||||
We can use the same :class:`~dgl.dataloading.pytorch.DistNodeDataLoader`, the distributed counterpart
|
||||
of :class:`~dgl.dataloading.pytorch.NodeDataLoader`, to create a distributed mini-batch sampler for
|
||||
node classification.
|
||||
|
||||
|
||||
@@ -274,10 +275,10 @@ node classification.
|
||||
.. code-block:: python
|
||||
|
||||
sampler = dgl.dataloading.MultiLayerNeighborSampler([25,10])
|
||||
train_dataloader = dgl.dataloading.NodeDataLoader(
|
||||
train_dataloader = dgl.dataloading.DistNodeDataLoader(
|
||||
g, train_nid, sampler, batch_size=1024,
|
||||
shuffle=True, drop_last=False)
|
||||
valid_dataloader = dgl.dataloading.NodeDataLoader(
|
||||
valid_dataloader = dgl.dataloading.DistNodeDataLoader(
|
||||
g, valid_nid, sampler, batch_size=1024,
|
||||
shuffle=False, drop_last=False)
|
||||
|
||||
@@ -432,4 +433,4 @@ If we split the graph into four partitions as demonstrated at the beginning of t
|
||||
ip_addr3
|
||||
ip_addr4
|
||||
|
||||
'''
|
||||
'''
|
||||
|
||||
Reference in New Issue
Block a user