[Sampling] Separate DistNodeDataLoader from NodeDataLoader (#3578)

* separate distdataloader from node/edgedataloader

* oops

* docs

* change user guide

* changes

* revert

* fix
This commit is contained in:
Quan (Andy) Gan
2021-12-13 11:25:25 +08:00
committed by GitHub
parent 4518698237
commit fa9f494c5c
13 changed files with 233 additions and 181 deletions

View File

@@ -17,6 +17,8 @@ and an ``EdgeDataLoader`` for edge/link prediction task.
.. autoclass:: NodeDataLoader
.. autoclass:: EdgeDataLoader
.. autoclass:: GraphDataLoader
.. autoclass:: DistNodeDataLoader
.. autoclass:: DistEdgeDataLoader
.. _api-dataloading-neighbor-sampling:

View File

@@ -202,20 +202,20 @@ DGL provides two levels of APIs for sampling nodes and edges to generate mini-ba
(see the section of mini-batch training). The low-level APIs require users to write code
to explicitly define how a layer of nodes are sampled (e.g., using :func:`dgl.sampling.sample_neighbors` ).
The high-level sampling APIs implement a few popular sampling algorithms for node classification
and link prediction tasks (e.g., :class:`~dgl.dataloading.pytorch.NodeDataloader` and
:class:`~dgl.dataloading.pytorch.EdgeDataloader` ).
and link prediction tasks (e.g., :class:`~dgl.dataloading.pytorch.NodeDataLoader` and
:class:`~dgl.dataloading.pytorch.EdgeDataLoader` ).
The distributed sampling module follows the same design and provides two levels of sampling APIs.
For the lower-level sampling API, it provides :func:`~dgl.distributed.sample_neighbors` for
distributed neighborhood sampling on :class:`~dgl.distributed.DistGraph`. In addition, DGL provides
a distributed Dataloader (:class:`~dgl.distributed.DistDataLoader` ) for distributed sampling.
The distributed Dataloader has the same interface as Pytorch DataLoader except that users cannot
a distributed DataLoader (:class:`~dgl.distributed.DistDataLoader` ) for distributed sampling.
The distributed DataLoader has the same interface as Pytorch DataLoader except that users cannot
specify the number of worker processes when creating a dataloader. The worker processes are created
in :func:`dgl.distributed.initialize`.
**Note**: When running :func:`dgl.distributed.sample_neighbors` on :class:`~dgl.distributed.DistGraph`,
the sampler cannot run in Pytorch Dataloader with multiple worker processes. The main reason is that
Pytorch Dataloader creates new sampling worker processes in every epoch, which leads to creating and
the sampler cannot run in Pytorch DataLoader with multiple worker processes. The main reason is that
Pytorch DataLoader creates new sampling worker processes in every epoch, which leads to creating and
destroying :class:`~dgl.distributed.DistGraph` objects many times.
When using the low-level API, the sampling code is similar to single-process sampling. The only
@@ -240,17 +240,17 @@ difference is that users need to use :func:`dgl.distributed.sample_neighbors` an
for batch in dataloader:
...
The same high-level sampling APIs (:class:`~dgl.dataloading.pytorch.NodeDataloader` and
:class:`~dgl.dataloading.pytorch.EdgeDataloader` ) work for both :class:`~dgl.DGLGraph`
and :class:`~dgl.distributed.DistGraph`. When using :class:`~dgl.dataloading.pytorch.NodeDataloader`
and :class:`~dgl.dataloading.pytorch.EdgeDataloader`, the distributed sampling code is exactly
the same as single-process sampling.
The high-level sampling APIs (:class:`~dgl.dataloading.pytorch.NodeDataLoader` and
:class:`~dgl.dataloading.pytorch.EdgeDataLoader` ) has distributed counterparts
(:class:`~dgl.dataloading.pytorch.DistNodeDataLoader` and
:class:`~dgl.dataloading.pytorch.DistEdgeDataLoader`). The code is exactly the
same as single-process sampling otherwise.
.. code:: python
sampler = dgl.sampling.MultiLayerNeighborSampler([10, 25])
dataloader = dgl.sampling.NodeDataLoader(g, train_nid, sampler,
batch_size=batch_size, shuffle=True)
dataloader = dgl.sampling.DistNodeDataLoader(g, train_nid, sampler,
batch_size=batch_size, shuffle=True)
for batch in dataloader:
...

View File

@@ -177,9 +177,9 @@ DGL提供了一个稀疏的Adagrad优化器 :class:`~dgl.distributed.SparseAdagr
DGL提供了两个级别的API用于对节点和边进行采样以生成小批次训练数据(请参阅小批次训练的章节)。
底层API要求用户编写代码以明确定义如何对节点层进行采样(例如,使用 :func:`dgl.sampling.sample_neighbors` )。
高层采样API为节点分类和链接预测任务实现了一些流行的采样算法例如
:class:`~dgl.dataloading.pytorch.NodeDataloader`
:class:`~dgl.dataloading.pytorch.NodeDataLoader`
:class:`~dgl.dataloading.pytorch.EdgeDataloader` )。
:class:`~dgl.dataloading.pytorch.EdgeDataLoader` )。
分布式采样模块遵循相同的设计也提供两个级别的采样API。对于底层的采样API它为
:class:`~dgl.distributed.DistGraph` 上的分布式邻居采样提供了
@@ -188,7 +188,7 @@ DGL提供了两个级别的API用于对节点和边进行采样以生成小
分布式数据加载器具有与PyTorch DataLoader相同的接口。其中的工作进程(worker)在 :func:`dgl.distributed.initialize` 中创建。
**Note**: 在 :class:`~dgl.distributed.DistGraph` 上运行 :func:`dgl.distributed.sample_neighbors` 时,
采样器无法在具有多个工作进程的PyTorch Dataloader中运行。主要原因是PyTorch Dataloader在每个训练周期都会创建新的采样工作进程
采样器无法在具有多个工作进程的PyTorch DataLoader中运行。主要原因是PyTorch DataLoader在每个训练周期都会创建新的采样工作进程
从而导致多次创建和删除 :class:`~dgl.distributed.DistGraph` 对象。
使用底层API时采样代码类似于单进程采样。唯一的区别是用户需要使用
@@ -214,19 +214,19 @@ DGL提供了两个级别的API用于对节点和边进行采样以生成小
for batch in dataloader:
...
:class:`~dgl.DGLGraph`:class:`~dgl.distributed.DistGraph` 都可以使用相同的高级采样API(
:class:`~dgl.dataloading.pytorch.NodeDataloader`
:class:`~dgl.dataloading.pytorch.NodeDataLoader`
:class:`~dgl.dataloading.pytorch.EdgeDataloader`)。使用
:class:`~dgl.dataloading.pytorch.NodeDataloader`
:class:`~dgl.dataloading.pytorch.EdgeDataLoader` 有分布式的版本
:class:`~dgl.dataloading.pytorch.DistNodeDataLoader`
:class:`~dgl.dataloading.pytorch.EdgeDataloader` 时,分布式采样代码与单进程采样完全相同
:class:`~dgl.dataloading.pytorch.DistEdgeDataLoader`使用
时分布式采样代码与单进程采样几乎完全相同。
.. code:: python
sampler = dgl.sampling.MultiLayerNeighborSampler([10, 25])
dataloader = dgl.sampling.NodeDataLoader(g, train_nid, sampler,
batch_size=batch_size, shuffle=True)
dataloader = dgl.sampling.DistNodeDataLoader(g, train_nid, sampler,
batch_size=batch_size, shuffle=True)
for batch in dataloader:
...

View File

@@ -132,8 +132,8 @@ DGL은 노드 임베딩들을 필요로 하는 변환 모델(transductive models
분산 샘플링
~~~~~~~~
DGL은 미니-배치를 생성하기 위해 노드 및 에지 샘플링을 하는 두 수준의 API를 제공한다 (미니-배치 학습 섹션 참조). Low-level API는 노드들의 레이어가 어떻게 샘플링될지를 명시적으로 정의하는 코드를 직접 작성해야한다 (예를 들면, :func:`dgl.sampling.sample_neighbors` 사용해서). High-level API는 노드 분류 및 링크 예측(예, :class:`~dgl.dataloading.pytorch.NodeDataloader`
:class:`~dgl.dataloading.pytorch.EdgeDataloader`) 에 사용되는 몇 가지 유명한 샘플링 알고리즘을 구현하고 있다.
DGL은 미니-배치를 생성하기 위해 노드 및 에지 샘플링을 하는 두 수준의 API를 제공한다 (미니-배치 학습 섹션 참조). Low-level API는 노드들의 레이어가 어떻게 샘플링될지를 명시적으로 정의하는 코드를 직접 작성해야한다 (예를 들면, :func:`dgl.sampling.sample_neighbors` 사용해서). High-level API는 노드 분류 및 링크 예측(예, :class:`~dgl.dataloading.pytorch.NodeDataLoader`
:class:`~dgl.dataloading.pytorch.EdgeDataLoader`) 에 사용되는 몇 가지 유명한 샘플링 알고리즘을 구현하고 있다.
분산 샘플링 모듈도 같은 디자인을 따르고 있고, 두 level의 샘플링 API를 제공한다. Low-level 샘플링 API의 경우, :class:`~dgl.distributed.DistGraph` 에 대한 분산 이웃 샘플링을 위해 :func:`~dgl.distributed.sample_neighbors` 가 있다. 또한, DGL은 분산 샘플링을 위해 분산 데이터 로더, :class:`~dgl.distributed.DistDataLoader` 를 제공한다. 분산 DataLoader는 PyTorch DataLoader와 같은 인터페이스를 갖는데, 다른 점은 사용자가 데이터 로더를 생성할 때 worker 프로세스의 개수를 지정할 수 없다는 점이다. Worker 프로세스들은 :func:`dgl.distributed.initialize` 에서 만들어진다.
@@ -159,13 +159,13 @@ Low-level API를 사용할 때, 샘플링 코드는 단일 프로세스 샘플
for batch in dataloader:
...
동일한 high-level 샘플링 API들(:class:`~dgl.dataloading.pytorch.NodeDataloader`:class:`~dgl.dataloading.pytorch.EdgeDataloader` )이 :class:`~dgl.DGLGraph`:class:`~dgl.distributed.DistGraph` 에 대해서 동작한다. :class:`~dgl.dataloading.pytorch.NodeDataloader`:class:`~dgl.dataloading.pytorch.EdgeDataloader` 를 사용할 때, 분산 샘플링 코드는 싱글-프로세스 샘플링 코드와 정확하게 같다.
동일한 high-level 샘플링 API들(:class:`~dgl.dataloading.pytorch.NodeDataLoader`:class:`~dgl.dataloading.pytorch.EdgeDataLoader` )이 :class:`~dgl.DGLGraph`:class:`~dgl.distributed.DistGraph` 에 대해서 동작한다. :class:`~dgl.dataloading.pytorch.NodeDataLoader`:class:`~dgl.dataloading.pytorch.EdgeDataLoader` 를 사용할 때, 분산 샘플링 코드는 싱글-프로세스 샘플링 코드와 정확하게 같다.
.. code:: python
sampler = dgl.sampling.MultiLayerNeighborSampler([10, 25])
dataloader = dgl.sampling.NodeDataLoader(g, train_nid, sampler,
batch_size=batch_size, shuffle=True)
dataloader = dgl.sampling.DistNodeDataLoader(g, train_nid, sampler,
batch_size=batch_size, shuffle=True)
for batch in dataloader:
...

View File

@@ -69,7 +69,7 @@ class SAGE(nn.Module):
y = th.zeros(g.number_of_nodes(), self.n_hidden if l != len(self.layers) - 1 else self.n_classes)
sampler = dgl.dataloading.MultiLayerNeighborSampler([None])
dataloader = dgl.dataloading.NodeDataLoader(
dataloader = dgl.dataloading.DistNodeDataLoader(
g,
th.arange(g.number_of_nodes()),
sampler,

View File

@@ -366,7 +366,7 @@ def run(args, device, data):
val_fanouts = [int(fanout) for fanout in args.validation_fanout.split(',')]
sampler = dgl.dataloading.MultiLayerNeighborSampler(fanouts)
dataloader = dgl.dataloading.NodeDataLoader(
dataloader = dgl.dataloading.DistNodeDataLoader(
g,
{'paper': train_nid},
sampler,
@@ -375,7 +375,7 @@ def run(args, device, data):
drop_last=False)
valid_sampler = dgl.dataloading.MultiLayerNeighborSampler(val_fanouts)
valid_dataloader = dgl.dataloading.NodeDataLoader(
valid_dataloader = dgl.dataloading.DistNodeDataLoader(
g,
{'paper': val_nid},
valid_sampler,
@@ -384,7 +384,7 @@ def run(args, device, data):
drop_last=False)
test_sampler = dgl.dataloading.MultiLayerNeighborSampler(val_fanouts)
test_dataloader = dgl.dataloading.NodeDataLoader(
test_dataloader = dgl.dataloading.DistNodeDataLoader(
g,
{'paper': test_nid},
test_sampler,

View File

@@ -287,6 +287,9 @@ class TemporalEdgeDataLoader(dgl.dataloading.EdgeDataLoader):
if dataloader_kwargs.get('num_workers', 0) > 0:
g.create_formats_()
def __iter__(self):
return iter(self.dataloader)
# ====== Fast Mode ======
# Part of code in reservoir sampling comes from PyG library

View File

@@ -292,7 +292,7 @@ if __name__ == "__main__":
if i < args.epochs-1 and args.fast_mode:
sampler.reset()
print(log_content[0], log_content[1], log_content[2])
except:
except KeyboardInterrupt:
traceback.print_exc()
error_content = "Training Interreputed!"
f.writelines(error_content)

View File

@@ -1,2 +1,3 @@
"""DGL PyTorch DataLoader module."""
from .dataloader import *
from .dist_dataloader import *

View File

@@ -6,9 +6,8 @@ import torch as th
from torch.utils.data import DataLoader, IterableDataset
from torch.utils.data.distributed import DistributedSampler
import torch.distributed as dist
from ..dataloader import NodeCollator, EdgeCollator, GraphCollator, SubgraphIterator, BlockSampler
from ...distributed import DistGraph
from ...distributed import DistDataLoader
from ..dataloader import NodeCollator, EdgeCollator, GraphCollator, SubgraphIterator, BlockSampler
from ...ndarray import NDArray as DGLNDArray
from ... import backend as F
from ...base import DGLError
@@ -24,6 +23,10 @@ PYTORCH_VER = LooseVersion(th.__version__)
PYTORCH_16 = PYTORCH_VER >= LooseVersion("1.6.0")
PYTORCH_17 = PYTORCH_VER >= LooseVersion("1.7.0")
def _check_graph_type(g):
if isinstance(g, DistGraph):
raise TypeError("Please use DistNodeDataLoader or DistEdgeDataLoader for DistGraph")
def _create_dist_sampler(dataset, dataloader_kwargs, ddp_seed):
# Note: will change the content of dataloader_kwargs
dist_sampler_kwargs = {'shuffle': dataloader_kwargs['shuffle']}
@@ -164,14 +167,6 @@ class _ScalarDataBatcher(th.utils.data.IterableDataset):
"""Set epoch number for distributed training."""
self.epoch = epoch
def _remove_kwargs_dist(kwargs):
if 'num_workers' in kwargs:
del kwargs['num_workers']
if 'pin_memory' in kwargs:
del kwargs['pin_memory']
print('Distributed DataLoader does not support pin_memory')
return kwargs
# The following code is a fix to the PyTorch-specific issue in
# https://github.com/dmlc/dgl/issues/2137
#
@@ -288,14 +283,14 @@ def _restore_storages(subgs, g):
_restore_subgraph_storage(subg, g)
class _NodeCollator(NodeCollator):
def collate(self, items):
def collate(self, items): # pylint: disable=missing-docstring
# input_nodes, output_nodes, blocks
result = super().collate(items)
_pop_storages(result[-1], self.g)
return result
class _EdgeCollator(EdgeCollator):
def collate(self, items):
def collate(self, items): # pylint: disable=missing-docstring
if self.negative_sampler is None:
# input_nodes, pair_graph, blocks
result = super().collate(items)
@@ -325,10 +320,10 @@ def _to_device(data, device):
return recursive_apply(data, lambda x: x.to(device))
class _NodeDataLoaderIter:
def __init__(self, node_dataloader):
def __init__(self, node_dataloader, iter_):
self.device = node_dataloader.device
self.node_dataloader = node_dataloader
self.iter_ = iter(node_dataloader.dataloader)
self.iter_ = iter_
self.async_load = node_dataloader.async_load
if self.async_load:
if F.device_type(self.device) != 'cuda':
@@ -350,10 +345,10 @@ class _NodeDataLoaderIter:
return result
class _EdgeDataLoaderIter:
def __init__(self, edge_dataloader):
def __init__(self, edge_dataloader, iter_):
self.device = edge_dataloader.device
self.edge_dataloader = edge_dataloader
self.iter_ = iter(edge_dataloader.dataloader)
self.iter_ = iter_
# Make this an iterator for PyTorch Lightning compatibility
def __iter__(self):
@@ -373,9 +368,9 @@ class _EdgeDataLoaderIter:
return result
class _GraphDataLoaderIter:
def __init__(self, graph_dataloader):
def __init__(self, graph_dataloader, iter_):
self.dataloader = graph_dataloader
self.iter_ = iter(graph_dataloader.dataloader)
self.iter_ = iter_
def __iter__(self):
return self
@@ -422,14 +417,9 @@ def _init_dataloader(collator, device, dataloader_kwargs, use_ddp, ddp_seed):
else:
dist_sampler = None
dataloader = DataLoader(
dataset,
collate_fn=collator.collate,
**dataloader_kwargs)
return use_scalar_batcher, scalar_batcher, dataset, collator, dist_sampler
return use_scalar_batcher, scalar_batcher, dataloader, dist_sampler
class NodeDataLoader:
class NodeDataLoader(DataLoader):
"""PyTorch dataloader for batch-iterating over a set of nodes, generating the list
of message flow graphs (MFGs) as computation dependency of the said minibatch.
@@ -555,10 +545,10 @@ class NodeDataLoader:
single-GPU and the whole graph does not fit in GPU memory.
"""
collator_arglist = inspect.getfullargspec(NodeCollator).args
def __init__(self, g, nids, graph_sampler, device=None, use_ddp=False, ddp_seed=0,
load_input=None, load_output=None, load_ndata=None, load_edata=None,
async_load=False, **kwargs):
_check_graph_type(g)
collator_kwargs = {}
dataloader_kwargs = {}
for k, v in kwargs.items():
@@ -567,75 +557,54 @@ class NodeDataLoader:
else:
dataloader_kwargs[k] = v
if isinstance(g, DistGraph):
if device is None:
# for the distributed case default to the CPU
device = 'cpu'
assert device == 'cpu', 'Only cpu is supported in the case of a DistGraph.'
# Distributed DataLoader currently does not support heterogeneous graphs
# and does not copy features. Fallback to normal solution
self.collator = NodeCollator(g, nids, graph_sampler, **collator_kwargs)
_remove_kwargs_dist(dataloader_kwargs)
self.dataloader = DistDataLoader(self.collator.dataset,
collate_fn=self.collator.collate,
**dataloader_kwargs)
self.is_distributed = True
if device is None:
# default to the same device the graph is on
device = th.device(g.device)
self.async_load = async_load
self.load_input = {} if load_input is None else load_input
self.load_output = {} if load_output is None else load_output
self.load_ndata = {} if load_ndata is None else load_ndata
# TODO(BarclayII): The samplers may not always return edge IDs due to
# efficiency concerns (see the comment in BlockSampler.__init__()).
# However, the async copy wrapper relies on EID in the returned subgraph
# to fetch edge features. Therefore to make edge feature fetching work,
# return_eids must be True. If we have a graph-level lazy index implemented
# we can potentially relax this constraint.
if (async_load and load_edata is not None and
not getattr(graph_sampler, "return_eids", True)):
raise DGLError("sampler's return_eids must be True if load_edata is not None")
self.load_edata = {} if load_edata is None else load_edata
# if the sampler supports it, tell it to output to the specified device.
# But if async_load is enabled, set_output_context should be skipped as
# we'd like to avoid any graph/data transfer graphs across devices in
# sampler. Such transfer will be handled in dataloader.
num_workers = dataloader_kwargs.get('num_workers', 0)
if ((not async_load) and
callable(getattr(graph_sampler, "set_output_context", None)) and
num_workers == 0):
graph_sampler.set_output_context(to_dgl_context(device))
else:
if device is None:
# default to the same device the graph is on
device = th.device(g.device)
graph_sampler.set_output_context(to_dgl_context(th.device('cpu')))
self.async_load = async_load
self.load_input = {} if load_input is None else load_input
self.load_output = {} if load_output is None else load_output
self.load_ndata = {} if load_ndata is None else load_ndata
# TODO(BarclayII): The samplers may not always return edge IDs due to
# efficiency concerns (see the comment in BlockSampler.__init__()).
# However, the async copy wrapper relies on EID in the returned subgraph
# to fetch edge features. Therefore to make edge feature fetching work,
# return_eids must be True. If we have a graph-level lazy index implemented
# we can potentially relax this constraint.
if (async_load and load_edata is not None and
not getattr(graph_sampler, "return_eids", True)):
raise DGLError("sampler's return_eids must be True if load_edata is not None")
self.load_edata = {} if load_edata is None else load_edata
self.collator = _NodeCollator(g, nids, graph_sampler, **collator_kwargs)
self.use_scalar_batcher, self.scalar_batcher, dataset, collator, self.dist_sampler = \
_init_dataloader(self.collator, device, dataloader_kwargs, use_ddp, ddp_seed)
super().__init__(dataset, collate_fn=collator.collate, **dataloader_kwargs)
# if the sampler supports it, tell it to output to the specified device.
# But if async_load is enabled, set_output_context should be skipped as
# we'd like to avoid any graph/data transfer graphs across devices in
# sampler. Such transfer will be handled in dataloader.
num_workers = dataloader_kwargs.get('num_workers', 0)
if ((not async_load) and
callable(getattr(graph_sampler, "set_output_context", None)) and
num_workers == 0):
graph_sampler.set_output_context(to_dgl_context(device))
else:
graph_sampler.set_output_context(to_dgl_context(th.device('cpu')))
self.use_ddp = use_ddp
self.is_distributed = False
self.collator = _NodeCollator(g, nids, graph_sampler, **collator_kwargs)
self.use_scalar_batcher, self.scalar_batcher, self.dataloader, self.dist_sampler = \
_init_dataloader(self.collator, device, dataloader_kwargs, use_ddp, ddp_seed)
self.use_ddp = use_ddp
self.is_distributed = False
# Precompute the CSR and CSC representations so each subprocess does not
# duplicate.
if num_workers > 0:
g.create_formats_()
# Precompute the CSR and CSC representations so each subprocess does not
# duplicate.
if num_workers > 0:
g.create_formats_()
self.device = device
def __iter__(self):
"""Return the iterator of the data loader."""
if self.is_distributed:
# Directly use the iterator of DistDataLoader, which doesn't copy features anyway.
return iter(self.dataloader)
else:
return _NodeDataLoaderIter(self)
def __len__(self):
"""Return the number of batches of the data loader."""
return len(self.dataloader)
return _NodeDataLoaderIter(self, super().__iter__())
def set_epoch(self, epoch):
"""Sets the epoch number for the underlying sampler which ensures all replicas
@@ -658,7 +627,7 @@ class NodeDataLoader:
else:
raise DGLError('set_epoch is only available when use_ddp is True.')
class EdgeDataLoader:
class EdgeDataLoader(DataLoader):
"""PyTorch dataloader for batch-iterating over a set of edges, generating the list
of message flow graphs (MFGs) as computation dependency of the said minibatch for
edge classification, edge regression, and link prediction.
@@ -866,8 +835,9 @@ class EdgeDataLoader:
* Link prediction on heterogeneous graph: RGCN for link prediction.
"""
collator_arglist = inspect.getfullargspec(EdgeCollator).args
def __init__(self, g, eids, graph_sampler, device='cpu', use_ddp=False, ddp_seed=0, **kwargs):
def __init__(self, g, eids, graph_sampler, device='cpu', use_ddp=False, ddp_seed=0,
**kwargs):
_check_graph_type(g)
collator_kwargs = {}
dataloader_kwargs = {}
for k, v in kwargs.items():
@@ -876,53 +846,30 @@ class EdgeDataLoader:
else:
dataloader_kwargs[k] = v
if isinstance(g, DistGraph):
if device is None:
# for the distributed case default to the CPU
device = 'cpu'
assert device == 'cpu', 'Only cpu is supported in the case of a DistGraph.'
# Distributed DataLoader currently does not support heterogeneous graphs
# and does not copy features. Fallback to normal solution
self.collator = EdgeCollator(g, eids, graph_sampler, **collator_kwargs)
_remove_kwargs_dist(dataloader_kwargs)
self.dataloader = DistDataLoader(self.collator.dataset,
collate_fn=self.collator.collate,
**dataloader_kwargs)
self.is_distributed = True
else:
if device is None:
# default to the same device the graph is on
device = th.device(g.device)
# if the sampler supports it, tell it to output to the
# specified device
num_workers = dataloader_kwargs.get('num_workers', 0)
if callable(getattr(graph_sampler, "set_output_context", None)) and num_workers == 0:
graph_sampler.set_output_context(to_dgl_context(device))
# if the sampler supports it, tell it to output to the
# specified device
num_workers = dataloader_kwargs.get('num_workers', 0)
if callable(getattr(graph_sampler, "set_output_context", None)) and num_workers == 0:
graph_sampler.set_output_context(to_dgl_context(device))
self.collator = _EdgeCollator(g, eids, graph_sampler, **collator_kwargs)
self.use_scalar_batcher, self.scalar_batcher, self.dataloader, self.dist_sampler = \
_init_dataloader(self.collator, device, dataloader_kwargs, use_ddp, ddp_seed)
self.use_ddp = use_ddp
self.is_distributed = False
self.collator = EdgeCollator(g, eids, graph_sampler, **collator_kwargs)
self.use_scalar_batcher, self.scalar_batcher, dataset, collator, self.dist_sampler = \
_init_dataloader(self.collator, device, dataloader_kwargs, use_ddp, ddp_seed)
self.use_ddp = use_ddp
super().__init__(dataset, collate_fn=collator.collate, **dataloader_kwargs)
# Precompute the CSR and CSC representations so each subprocess does not duplicate.
if num_workers > 0:
g.create_formats_()
# Precompute the CSR and CSC representations so each subprocess does not duplicate.
if num_workers > 0:
g.create_formats_()
self.device = device
def __iter__(self):
"""Return the iterator of the data loader."""
if self.is_distributed:
# Directly use the iterator of DistDataLoader, which doesn't copy features anyway.
return iter(self.dataloader)
else:
return _EdgeDataLoaderIter(self)
def __len__(self):
"""Return the number of batches of the data loader."""
return len(self.dataloader)
return _EdgeDataLoaderIter(self, super().__iter__())
def set_epoch(self, epoch):
"""Sets the epoch number for the underlying sampler which ensures all replicas
@@ -945,7 +892,7 @@ class EdgeDataLoader:
else:
raise DGLError('set_epoch is only available when use_ddp is True.')
class GraphDataLoader:
class GraphDataLoader(DataLoader):
"""PyTorch dataloader for batch-iterating over a set of graphs, generating the batched
graph and corresponding label tensor (if provided) of the said minibatch.
@@ -992,7 +939,6 @@ class GraphDataLoader:
... train_on(batched_graph, labels)
"""
collator_arglist = inspect.getfullargspec(GraphCollator).args
def __init__(self, dataset, collate_fn=None, use_ddp=False, ddp_seed=0, **kwargs):
collator_kwargs = {}
dataloader_kwargs = {}
@@ -1027,14 +973,11 @@ class GraphDataLoader:
if use_ddp:
self.dist_sampler = _create_dist_sampler(dataset, dataloader_kwargs, ddp_seed)
dataloader_kwargs['sampler'] = self.dist_sampler
self.dataloader = DataLoader(dataset=dataset,
collate_fn=self.collate,
**dataloader_kwargs)
super().__init__(dataset, collate_fn=self.collate, **dataloader_kwargs)
def __iter__(self):
"""Return the iterator of the data loader."""
return _GraphDataLoaderIter(self)
return _GraphDataLoaderIter(self, super().__iter__())
def __len__(self):
"""Return the number of batches of the data loader."""

View File

@@ -0,0 +1,102 @@
"""Distributed dataloaders.
"""
import inspect
from ...distributed import DistDataLoader
from ..dataloader import NodeCollator, EdgeCollator
def _remove_kwargs_dist(kwargs):
if 'num_workers' in kwargs:
del kwargs['num_workers']
if 'pin_memory' in kwargs:
del kwargs['pin_memory']
print('Distributed DataLoaders do not support pin_memory.')
return kwargs
class DistNodeDataLoader(DistDataLoader):
"""PyTorch dataloader for batch-iterating over a set of nodes, generating the list
of message flow graphs (MFGs) as computation dependency of the said minibatch, on
a distributed graph.
All the arguments have the same meaning as the single-machine counterpart
:class:`dgl.dataloading.pytorch.NodeDataLoader` except the first argument
:attr:`g` which must be a :class:`dgl.distributed.DistGraph`.
Parameters
----------
g : DistGraph
The distributed graph.
nids, graph_sampler, device, kwargs :
See :class:`dgl.dataloading.pytorch.NodeDataLoader`.
See also
--------
dgl.dataloading.pytorch.NodeDataLoader
"""
def __init__(self, g, nids, graph_sampler, device=None, **kwargs):
collator_kwargs = {}
dataloader_kwargs = {}
_collator_arglist = inspect.getfullargspec(NodeCollator).args
for k, v in kwargs.items():
if k in _collator_arglist:
collator_kwargs[k] = v
else:
dataloader_kwargs[k] = v
if device is None:
# for the distributed case default to the CPU
device = 'cpu'
assert device == 'cpu', 'Only cpu is supported in the case of a DistGraph.'
# Distributed DataLoader currently does not support heterogeneous graphs
# and does not copy features. Fallback to normal solution
self.collator = NodeCollator(g, nids, graph_sampler, **collator_kwargs)
_remove_kwargs_dist(dataloader_kwargs)
super().__init__(self.collator.dataset,
collate_fn=self.collator.collate,
**dataloader_kwargs)
self.device = device
class DistEdgeDataLoader(DistDataLoader):
"""PyTorch dataloader for batch-iterating over a set of edges, generating the list
of message flow graphs (MFGs) as computation dependency of the said minibatch for
edge classification, edge regression, and link prediction, on a distributed
graph.
All the arguments have the same meaning as the single-machine counterpart
:class:`dgl.dataloading.pytorch.EdgeDataLoader` except the first argument
:attr:`g` which must be a :class:`dgl.distributed.DistGraph`.
Parameters
----------
g : DistGraph
The distributed graph.
eids, graph_sampler, device, kwargs :
See :class:`dgl.dataloading.pytorch.EdgeDataLoader`.
See also
--------
dgl.dataloading.pytorch.EdgeDataLoader
"""
def __init__(self, g, eids, graph_sampler, device=None, **kwargs):
collator_kwargs = {}
dataloader_kwargs = {}
_collator_arglist = inspect.getfullargspec(EdgeCollator).args
for k, v in kwargs.items():
if k in _collator_arglist:
collator_kwargs[k] = v
else:
dataloader_kwargs[k] = v
if device is None:
# for the distributed case default to the CPU
device = 'cpu'
assert device == 'cpu', 'Only cpu is supported in the case of a DistGraph.'
# Distributed DataLoader currently does not support heterogeneous graphs
# and does not copy features. Fallback to normal solution
self.collator = EdgeCollator(g, eids, graph_sampler, **collator_kwargs)
_remove_kwargs_dist(dataloader_kwargs)
super().__init__(self.collator.dataset,
collate_fn=self.collator.collate,
**dataloader_kwargs)
self.device = device

View File

@@ -148,14 +148,14 @@ def start_dist_neg_dataloader(rank, tmpdir, num_server, num_workers, orig_nid, g
num_negs = 5
sampler = dgl.dataloading.MultiLayerNeighborSampler([5,10])
negative_sampler=dgl.dataloading.negative_sampler.Uniform(num_negs)
dataloader = dgl.dataloading.EdgeDataLoader(dist_graph,
train_eid,
sampler,
batch_size=batch_size,
negative_sampler=negative_sampler,
shuffle=True,
drop_last=False,
num_workers=num_workers)
dataloader = dgl.dataloading.DistEdgeDataLoader(dist_graph,
train_eid,
sampler,
batch_size=batch_size,
negative_sampler=negative_sampler,
shuffle=True,
drop_last=False,
num_workers=num_workers)
for _ in range(2):
for _, (_, pos_graph, neg_graph, blocks) in zip(range(0, num_edges_to_sample, batch_size), dataloader):
block = blocks[-1]
@@ -283,7 +283,7 @@ def start_node_dataloader(rank, tmpdir, num_server, num_workers, orig_nid, orig_
# We need to test creating DistDataLoader multiple times.
for i in range(2):
# Create DataLoader for constructing blocks
dataloader = dgl.dataloading.NodeDataLoader(
dataloader = dgl.dataloading.DistNodeDataLoader(
dist_graph,
train_nid,
sampler,
@@ -334,7 +334,7 @@ def start_edge_dataloader(rank, tmpdir, num_server, num_workers, orig_nid, orig_
# We need to test creating DistDataLoader multiple times.
for i in range(2):
# Create DataLoader for constructing blocks
dataloader = dgl.dataloading.EdgeDataLoader(
dataloader = dgl.dataloading.DistEdgeDataLoader(
dist_graph,
train_eid,
sampler,

View File

@@ -265,7 +265,8 @@ Pytorch's `DistributedDataParallel`.
Distributed mini-batch sampler
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
We can use the same `NodeDataLoader` to create a distributed mini-batch sampler for
We can use the same :class:`~dgl.dataloading.pytorch.DistNodeDataLoader`, the distributed counterpart
of :class:`~dgl.dataloading.pytorch.NodeDataLoader`, to create a distributed mini-batch sampler for
node classification.
@@ -274,10 +275,10 @@ node classification.
.. code-block:: python
sampler = dgl.dataloading.MultiLayerNeighborSampler([25,10])
train_dataloader = dgl.dataloading.NodeDataLoader(
train_dataloader = dgl.dataloading.DistNodeDataLoader(
g, train_nid, sampler, batch_size=1024,
shuffle=True, drop_last=False)
valid_dataloader = dgl.dataloading.NodeDataLoader(
valid_dataloader = dgl.dataloading.DistNodeDataLoader(
g, valid_nid, sampler, batch_size=1024,
shuffle=False, drop_last=False)
@@ -432,4 +433,4 @@ If we split the graph into four partitions as demonstrated at the beginning of t
ip_addr3
ip_addr4
'''
'''