Skip to content

use pyreader to read data in dygraph mode #17314

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 18 commits into from
Jun 5, 2019
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion paddle/fluid/API.spec
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ paddle.fluid.io.load_params (ArgSpec(args=['executor', 'dirname', 'main_program'
paddle.fluid.io.load_persistables (ArgSpec(args=['executor', 'dirname', 'main_program', 'filename'], varargs=None, keywords=None, defaults=(None, None)), ('document', '28df5bfe26ca7a077f91156abb0fe6d2'))
paddle.fluid.io.save_inference_model (ArgSpec(args=['dirname', 'feeded_var_names', 'target_vars', 'executor', 'main_program', 'model_filename', 'params_filename', 'export_for_deployment'], varargs=None, keywords=None, defaults=(None, None, None, True)), ('document', 'af82e1b5fe5764029905a191b987f63d'))
paddle.fluid.io.load_inference_model (ArgSpec(args=['dirname', 'executor', 'model_filename', 'params_filename', 'pserver_endpoints'], varargs=None, keywords=None, defaults=(None, None, None)), ('document', 'aaf3392332f2e5ef9d9177985be2c04a'))
paddle.fluid.io.PyReader.__init__ (ArgSpec(args=['self', 'feed_list', 'capacity', 'use_double_buffer', 'iterable'], varargs=None, keywords=None, defaults=(True, False)), ('document', '6adf97f83acf6453d4a6a4b1070f3754'))
paddle.fluid.io.PyReader.__init__ (ArgSpec(args=['self', 'feed_list', 'capacity', 'use_double_buffer', 'iterable', 'return_list'], varargs=None, keywords=None, defaults=(None, 1, True, False, False)), ('document', '6adf97f83acf6453d4a6a4b1070f3754'))
paddle.fluid.io.PyReader.decorate_batch_generator (ArgSpec(args=['self', 'reader', 'places'], varargs=None, keywords=None, defaults=(None,)), ('document', '4a072de39998ee4e0de33fcec11325a6'))
paddle.fluid.io.PyReader.decorate_sample_generator (ArgSpec(args=['self', 'sample_generator', 'batch_size', 'drop_last', 'places'], varargs=None, keywords=None, defaults=(True, None)), ('document', '3db4b24d33fe4f711e303f9673dc5c6a'))
paddle.fluid.io.PyReader.decorate_sample_list_generator (ArgSpec(args=['self', 'reader', 'places'], varargs=None, keywords=None, defaults=(None,)), ('document', '94adc0fb71c4b2ae6c3c74886c9cb898'))
Expand Down
18 changes: 18 additions & 0 deletions paddle/fluid/pybind/reader_py.cc
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ class MultiDeviceFeedReader {
public:
using ResultDictList =
std::vector<std::unordered_map<std::string, framework::LoDTensor>>;
using ResultList = std::vector<std::vector<framework::LoDTensor>>;

MultiDeviceFeedReader(
const std::shared_ptr<operators::reader::LoDTensorBlockingQueue> &queue,
Expand Down Expand Up @@ -81,6 +82,21 @@ class MultiDeviceFeedReader {
return result;
}

ResultList ReadNextList() {
bool success = WaitFutures();
if (!success) {
return {};
}

ResultList result;
result.reserve(ret_.size());
for (size_t i = 0; i < ret_.size(); ++i) {
result.emplace_back(std::move(ret_[i]));
}
ReadAsync();
return result;
}

void Reset() {
Shutdown();
Start();
Expand Down Expand Up @@ -142,6 +158,8 @@ void BindReader(py::module *module) {
py::class_<MultiDeviceFeedReader>(m, "MultiDeviceFeedReader", "")
.def("read_next", &MultiDeviceFeedReader::ReadNext,
py::call_guard<py::gil_scoped_release>())
.def("read_next_list", &MultiDeviceFeedReader::ReadNextList,
py::call_guard<py::gil_scoped_release>())
.def("reset", &MultiDeviceFeedReader::Reset,
py::call_guard<py::gil_scoped_release>());

Expand Down
42 changes: 42 additions & 0 deletions python/paddle/fluid/data_feeder.py
Original file line number Diff line number Diff line change
Expand Up @@ -432,3 +432,45 @@ def __reader_creator__():
"not implemented")

return __reader_creator__


class NumpyToLoDTensorConverter(object):
def __init__(self, place):
self.place = place
self._reset()

def _reset(self):
self.data = []

def feed(self, data):
self.data.append(data)

def done(self):
arr = numpy.array(self.data)
t = core.LoDTensor()
t.set(arr, self.place)
self._reset()
return t


class ListTensorProvider(object):
def __init__(self, generator, place):
self.generator = generator
self.converters = []
self.place = place

def _done(self):
return [c.done() for c in self.converters]

def __call__(self):
for iterable in self.generator():
for each_sample in iterable:
if len(self.converters) < len(each_sample):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if number of input slots change?

for item in each_sample:
self.converters.append(
NumpyToLoDTensorConverter(place=self.place))
for each_converter, each_slot in six.moves.zip(self.converters,
each_sample):
each_converter.feed(each_slot)

yield self._done()
7 changes: 5 additions & 2 deletions python/paddle/fluid/framework.py
Original file line number Diff line number Diff line change
Expand Up @@ -652,8 +652,11 @@ def dtype(self):

@property
def lod_level(self):
# TODO(minqiyang): Support lod_level in dygraph mode
return self.desc.lod_level()
if in_dygraph_mode():
# TODO(minqiyang): Support lod_level in dygraph mode
return 0
else:
return self.desc.lod_level()

@property
def type(self):
Expand Down
133 changes: 104 additions & 29 deletions python/paddle/fluid/reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,13 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from . import core
from . import core, dygraph
import six
import numpy as np
import threading
from .framework import Program, Variable, program_guard, default_main_program, default_startup_program
from .framework import Program, Variable, program_guard, default_main_program, default_startup_program, in_dygraph_mode
from .executor import global_scope
from .data_feeder import DataFeeder, BatchedTensorProvider
from .data_feeder import DataFeeder, BatchedTensorProvider, ListTensorProvider
from .layers.io import monkey_patch_reader_methods, _copy_reader_var_, double_buffer
from .unique_name import UniqueNameGenerator

Expand Down Expand Up @@ -48,12 +49,13 @@ class PyReader(object):

Args:
feed_list (list(Variable)|tuple(Variable)): feed variable list.
The variables should be created by :code:`fluid.layers.data()`.
The variables should be created by :code:`fluid.layers.data()`.
it can be None under iterable mode.
capacity (int): capacity of the queue maintained in PyReader object.
use_double_buffer (bool): whether to use double_buffer_reader to
speed up data feeding.
iterable (bool): whether the created reader object is iterable.

return_list (bool): whether the return value presented as list.
Returns:
reader (Reader): the created reader object.

Expand Down Expand Up @@ -124,7 +126,7 @@ def reader():
return reader

image = fluid.layers.data(name='image', shape=[784, 784], dtype='float32')
reader = fluid.io.PyReader(feed_list=[image], capacity=4, iterable=True)
reader = fluid.io.PyReader(feed_list=[image], capacity=4, iterable=True, return_list=False)

user_defined_reader = reader_creator_random_image(784, 784)
reader.decorate_sample_list_generator(
Expand All @@ -138,26 +140,71 @@ def reader():
for data in reader():
executor.run(feed=data)


3. If return_list=True, the return values would be presented as list instead of dict`.

.. code-block:: python

import paddle
import paddle.fluid as fluid
import numpy as np

EPOCH_NUM = 3
ITER_NUM = 5
BATCH_SIZE = 10

def reader_creator_random_image(height, width):
def reader():
for i in range(ITER_NUM):
yield np.random.uniform(low=0, high=255, size=[height, width]),
return reader

image = fluid.layers.data(name='image', shape=[784, 784], dtype='float32')
reader = fluid.io.PyReader(feed_list=[image], capacity=4, iterable=True, return_list=True)

user_defined_reader = reader_creator_random_image(784, 784)
reader.decorate_sample_list_generator(
paddle.batch(user_defined_reader, batch_size=BATCH_SIZE),
fluid.core.CPUPlace())
# definition of network is omitted
executor = fluid.Executor(fluid.core.CPUPlace())
executor.run(fluid.default_main_program())

for _ in range(EPOCH_NUM):
for data in reader():
executor.run(feed={"image": data[0]})
"""

unique_name_generator = UniqueNameGenerator()

def __init__(self,
feed_list,
capacity,
feed_list=None,
capacity=1,
use_double_buffer=True,
iterable=False):
iterable=False,
return_list=False):
self._tensor_reader = None
self._thread = None
self._iterable = iterable
self._feed_list = feed_list
# force to use iterable mode under dygraph mode
if in_dygraph_mode():
self._iterable = True
self._return_list = True
else:
self._iterable = iterable
self._return_list = return_list
if not self._feed_list:
raise Exception("Feed list must be given under static mode.")
self._use_double_buffer = use_double_buffer
self._capacity = capacity
self._feed_list = feed_list
if not self._iterable:
self._init_non_iterable()

def _init_iterable(self, places):
self._var_names = [v.name for v in self._feed_list]
if in_dygraph_mode():
self._var_names = []
else:
self._var_names = [v.name for v in self._feed_list]
self._places = _convert_places(places)
self._queue = core.init_lod_tensor_blocking_queue(core.Variable(),
self._capacity)
Expand Down Expand Up @@ -240,6 +287,7 @@ class Iterator(object):
def __init__(self, reader):
self._reader = reader._reader
self._reset = reader._reset
self._return_list = reader._return_list

def __iter__(self):
return self
Expand All @@ -248,12 +296,29 @@ def __next__(self):
return self.next()

def next(self):
ret = self._reader.read_next()
if ret:
return ret
if not in_dygraph_mode():
ret = None
if self._return_list:
ret = self._reader.read_next_list()
ret = ret[0] if ret is not None and len(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if multi-card training is enabled?

ret) > 0 else None
else:
ret = self._reader.read_next()
if ret:
return ret
else:
self._reset()
raise StopIteration
else:
self._reset()
raise StopIteration
ret = self._reader.read_next_list()
if ret and ret[0]:
return [
dygraph.base.to_variable(np.array(v))
for v in ret[0]
]
else:
self._reset()
raise StopIteration

self._start()
return Iterator(self)
Expand Down Expand Up @@ -293,8 +358,9 @@ def generator():
break

'''
assert not self._iterable, "start() cannot be called when PyReader is iterable"
self._start()
if not in_dygraph_mode():
assert not self._iterable, "start() cannot be called when PyReader is iterable"
self._start()

def reset(self):
'''
Expand Down Expand Up @@ -327,8 +393,9 @@ def generator():
break

'''
assert not self._iterable, "reset() cannot be called when PyReader is iterable"
self._reset()
if not in_dygraph_mode():
assert not self._iterable, "reset() cannot be called when PyReader is iterable"
self._reset()

def _start(self):
def __thread_main__():
Expand Down Expand Up @@ -488,14 +555,22 @@ def generator():
'''
assert self._tensor_reader is None, \
"Cannot reset the data source of PyReader"
with program_guard(Program(), Program()):
feeder = DataFeeder(
feed_list=self._feed_list, place=core.CPUPlace())
paddle_reader = feeder.decorate_reader(reader, multi_devices=False)

def __tensor_reader_impl__():
for slots in paddle_reader():
yield [slots[var.name] for var in self._feed_list]
if not in_dygraph_mode():
with program_guard(Program(), Program()):
feeder = DataFeeder(
feed_list=self._feed_list, place=core.CPUPlace())
paddle_reader = feeder.decorate_reader(
reader, multi_devices=False)

def __tensor_reader_impl__():
for slots in paddle_reader():
yield [slots[var.name] for var in self._feed_list]
else:
provider = ListTensorProvider(reader, place=core.CPUPlace())

def __tensor_reader_impl__():
for slots in provider():
yield slots

self.decorate_batch_generator(__tensor_reader_impl__, places)

Expand Down
14 changes: 8 additions & 6 deletions python/paddle/fluid/tests/demo/pyreader.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,19 +80,21 @@ def main():
train_reader.start()
try:
while True:
print 'train_loss', numpy.array(
trainer.run(fetch_list=[loss.name]))
print(
'train_loss',
numpy.array(trainer.run(fetch_list=[loss.name])))
except fluid.core.EOFException:
print 'End of epoch', epoch_id
print('End of epoch', epoch_id)
train_reader.reset()

test_reader.start()
try:
while True:
print 'test loss', numpy.array(
tester.run(fetch_list=[test_loss.name]))
print(
'test loss',
numpy.array(tester.run(fetch_list=[test_loss.name])))
except fluid.core.EOFException:
print 'End of testing'
print('End of testing')
test_reader.reset()


Expand Down
Loading