Skip to content

Commit 8af2403

Browse files
HYLcoolCathy0908
andauthored
Support store image bytes in the dataset (#725)
* * fix wrong mm_key init * + support load image from the specified bytes key instead of paths only * * fix the wrong context handling logics in the compute_stats_batched method of basic Filter * * make all image-related filters support load from bytes data * * make all image-related mappers support load from bytes data * * make all image-related mappers support update bytes data to the latest one * * restore the context behavior for base op * * support context for general fused op * * fix minor bugs * * fix schema alignment problem in general fused op * * change the parent class of GeneralFusedOP from OP to Mapper * update DownloadFileMapper and custom webdataset encoder and decoder (#724) * support context for DownloadFileMapper and custom webdataset encoder and decoder * update DownloadFileMapper * update raydataset * update webdata _custom_default_encoder * + add ray tag for the webdataset test case * * move webdataset_utils to the utils module + add the customizable reconstruct function * + support customized webdataset format reconstruction before exporting * + support export_type * * update exporter args in analyzer * fix the logics for the default alignment for download_file_mapper * specify the stdout encoding for the new process * - refactor the export method of RayExporter to the _router style * * fix docstring --------- Co-authored-by: Cathy0908 <30484308+Cathy0908@users.noreply.github.com>
1 parent 800a1d9 commit 8af2403

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

42 files changed

+1011
-137
lines changed

configs/config_all.yaml

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,13 @@ validators: # validators are a l
2727
text: 'str'
2828

2929
export_path: '/path/to/result/dataset.jsonl' # path to processed result dataset. Supported suffixes include ['jsonl', 'json', 'parquet']
30+
export_type: 'jsonl' # The export format type. If it's not specified, Data-Juicer will parse from the export_path. The supported types can be found in Exporter._router() for standalone mode and RayExporter._SUPPORTED_FORMATS for ray mode
3031
export_shard_size: 0 # shard size of exported dataset in Byte. In default, it's 0, which means export the whole dataset into only one file. If it's set a positive number, the exported dataset will be split into several dataset shards, and the max size of each shard won't larger than the export_shard_size
3132
export_in_parallel: false # whether to export the result dataset in parallel to a single file, which usually takes less time. It only works when export_shard_size is 0, and its default number of processes is the same as the argument np. **Notice**: If it's True, sometimes exporting in parallel might require much more time due to the IO blocking, especially for very large datasets. When this happens, False is a better choice, although it takes more time.
33+
keep_stats_in_res_ds: false # whether to keep the computed stats in the result dataset. The intermediate fields to store the stats computed by Filters will be removed if it's False. It's False in default.
34+
keep_hashes_in_res_ds: false # whether to keep the computed hashes in the result dataset. The intermediate fields to store the hashes computed by Deduplicators will be removed if it's False. It's False in default.
35+
export_extra_args: {} # Other optional arguments for exporting in dict. For example, the key mapping info for exporting the WebDataset format.
36+
3237
np: 4 # number of subprocess to process your dataset
3338
text_keys: 'text' # the key name of field where the sample texts to be processed, e.g., `text`, `instruction`, `output`, ...
3439
# Note: currently, we support specify only ONE key for each op, for cases requiring multiple keys, users can specify the op multiple times. We will only use the first key of `text_keys` when you set multiple keys.
@@ -46,12 +51,11 @@ trace_num: 10 # number of samples
4651
op_fusion: false # whether to fuse operators that share the same intermediate variables automatically. Op fusion might reduce the memory requirements slightly but speed up the whole process.
4752
fusion_strategy: 'probe' # OP fusion strategy. Support ['greedy', 'probe'] now. 'greedy' means keep the basic OP order and put the fused OP to the last of each fused OP group. 'probe' means Data-Juicer will probe the running speed for each OP at the beginning and reorder the OPs and fused OPs according to their probed speed (fast to slow). It's 'probe' in default.
4853
cache_compress: null # the compression method of the cache file, which can be specified in ['gzip', 'zstd', 'lz4']. If this parameter is None, the cache file will not be compressed. We recommend you turn on this argument when your input dataset is larger than tens of GB and your disk space is not enough.
49-
keep_stats_in_res_ds: false # whether to keep the computed stats in the result dataset. The intermediate fields to store the stats computed by Filters will be removed if it's False. It's False in default.
50-
keep_hashes_in_res_ds: false # whether to keep the computed hashes in the result dataset. The intermediate fields to store the hashes computed by Deduplicators will be removed if it's False. It's False in default.
5154
adaptive_batch_size: false # whether to use adaptive batch sizes for each OP according to the probed results. It's False in default.
5255

5356
# for multimodal data processing
5457
image_key: 'images' # key name of field to store the list of sample image paths.
58+
image_bytes_key: 'image_bytes' # key name of field to store the list of sample image bytes.
5559
image_special_token: '<__dj__image>' # the special token that represents an image in the text. In default, it's "<__dj__image>". You can specify your own special token according to your input dataset.
5660
audio_key: 'audios' # key name of field to store the list of sample audio paths.
5761
audio_special_token: '<__dj__audio>' # the special token that represents an audio in the text. In default, it's "<__dj__audio>". You can specify your own special token according to your input dataset.
@@ -273,10 +277,12 @@ process:
273277
- extract_tables_from_html_mapper: # extract tables from HTML content
274278
tables_field_name: 'html_tables' # Field name to store the extracted tables.
275279
retain_html_tags: false, # If True, retains HTML tags in the tables; otherwise, removes them.
276-
include_header: true, # If True, includes the table header; otherwise, excludes it. This parameter is effective only when `retain_html_tags` is False and applies solely to the extracted table content.
280+
include_header: true # If True, includes the table header; otherwise, excludes it. This parameter is effective only when `retain_html_tags` is False and applies solely to the extracted table content.
277281
- download_file_mapper: # download url files to local files
278282
save_dir: null # The directory to save downloaded files.
279283
download_field: null # The filed name to get the url to download.
284+
save_field: null # The filed name to save the downloaded file content.
285+
resume_download: false # Whether to resume download. if True, skip the sample if it exists.
280286
timeout: 30 # The timeout in seconds for each HTTP request.
281287
max_concurrent: 10 # Maximum concurrent downloads.
282288
- fix_unicode_mapper: # fix unicode errors in text.

data_juicer/config/config.py

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -167,6 +167,14 @@ def init_configs(args: Optional[List[str]] = None, which_entry: object = None, l
167167
"directory to store the processed dataset will be the work "
168168
"directory of this process.",
169169
)
170+
parser.add_argument(
171+
"--export_type",
172+
type=str,
173+
default=None,
174+
help="The export format type. If it's not specified, Data-Juicer will parse from the export_path. The "
175+
"supported types can be found in Exporter._router() for standalone mode and "
176+
"RayExporter._SUPPORTED_FORMATS for ray mode",
177+
)
170178
parser.add_argument(
171179
"--export_shard_size",
172180
type=NonNegativeInt,
@@ -190,6 +198,13 @@ def init_configs(args: Optional[List[str]] = None, which_entry: object = None, l
190198
"When this happens, False is a better choice, although it takes "
191199
"more time.",
192200
)
201+
parser.add_argument(
202+
"--export_extra_args",
203+
type=Dict,
204+
default={},
205+
help="Other optional arguments for exporting in dict. For example, the key mapping info for exporting "
206+
"the WebDataset format.",
207+
)
193208
parser.add_argument(
194209
"--keep_stats_in_res_ds",
195210
type=bool,
@@ -224,6 +239,12 @@ def init_configs(args: Optional[List[str]] = None, which_entry: object = None, l
224239
default="images",
225240
help="Key name of field to store the list of sample image paths.", # noqa: E251
226241
)
242+
parser.add_argument(
243+
"--image_bytes_key",
244+
type=str,
245+
default="image_bytes",
246+
help="Key name of field to store the list of sample image bytes.", # noqa: E251
247+
)
227248
parser.add_argument(
228249
"--image_special_token",
229250
type=str,
@@ -667,6 +688,7 @@ def init_setup_from_cfg(cfg: Namespace, load_configs_only=False):
667688
"image_key": cfg.get("image_key", "images"),
668689
"audio_key": cfg.get("audio_key", "audios"),
669690
"video_key": cfg.get("video_key", "videos"),
691+
"image_bytes_key": cfg.get("image_bytes_key", "image_bytes"),
670692
"num_proc": cfg.np,
671693
"turbo": cfg.get("turbo", False),
672694
"skip_op_error": cfg.get("skip_op_error", True),

data_juicer/core/analyzer.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@ def __init__(self, cfg: Optional[Namespace] = None):
5858
logger.info("Preparing exporter...")
5959
self.exporter = Exporter(
6060
self.cfg.export_path,
61+
self.cfg.export_type,
6162
self.cfg.export_shard_size,
6263
self.cfg.export_in_parallel,
6364
self.cfg.np,

data_juicer/core/data/ray_dataset.py

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,8 @@
1616
from data_juicer.utils.constant import Fields
1717
from data_juicer.utils.file_utils import is_remote_path
1818
from data_juicer.utils.lazy_loader import LazyLoader
19-
from data_juicer.utils.mm_utils import SpecialTokens
2019
from data_juicer.utils.process_utils import calculate_np
20+
from data_juicer.utils.webdataset_utils import _custom_default_decoder
2121

2222
ray = LazyLoader("ray")
2323

@@ -53,9 +53,9 @@ def set_dataset_to_absolute_path(dataset, dataset_path, cfg):
5353
path_keys = []
5454
columns = dataset.columns()
5555
for key in [
56-
cfg.get("video_key", SpecialTokens.video),
57-
cfg.get("image_key", SpecialTokens.image),
58-
cfg.get("audio_key", SpecialTokens.audio),
56+
cfg.get("video_key", "videos"),
57+
cfg.get("image_key", "images"),
58+
cfg.get("audio_key", "audios"),
5959
]:
6060
if key in columns:
6161
path_keys.append(key)
@@ -239,6 +239,8 @@ def process_batch_arrow(table: pyarrow.Table):
239239
def read(cls, data_format: str, paths: Union[str, List[str]]) -> RayDataset:
240240
if data_format in {"json", "jsonl"}:
241241
return RayDataset.read_json(paths)
242+
elif data_format == "webdataset":
243+
return RayDataset.read_webdataset(paths)
242244
elif data_format in {
243245
"parquet",
244246
"images",
@@ -248,7 +250,6 @@ def read(cls, data_format: str, paths: Union[str, List[str]]) -> RayDataset:
248250
"avro",
249251
"numpy",
250252
"tfrecords",
251-
"webdataset",
252253
"binary_files",
253254
"lance",
254255
}:
@@ -266,6 +267,10 @@ def read_json(cls, paths: Union[str, List[str]]) -> RayDataset:
266267
except AttributeError:
267268
return ray.data.read_json(paths)
268269

270+
@classmethod
271+
def read_webdataset(cls, paths: Union[str, List[str]]) -> RayDataset:
272+
return ray.data.read_webdataset(paths, decoder=partial(_custom_default_decoder, format="PIL"))
273+
269274
def to_list(self) -> list:
270275
return self.data.to_pandas().to_dict(orient="records")
271276

data_juicer/core/executor/default_executor.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,11 +72,13 @@ def __init__(self, cfg: Optional[Namespace] = None):
7272
logger.info("Preparing exporter...")
7373
self.exporter = Exporter(
7474
self.cfg.export_path,
75+
self.cfg.export_type,
7576
self.cfg.export_shard_size,
7677
self.cfg.export_in_parallel,
7778
self.cfg.np,
7879
keep_stats_in_res_ds=self.cfg.keep_stats_in_res_ds,
7980
keep_hashes_in_res_ds=self.cfg.keep_hashes_in_res_ds,
81+
**self.cfg.export_extra_args,
8082
)
8183

8284
# setup tracer

data_juicer/core/executor/ray_executor.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,8 +68,10 @@ def __init__(self, cfg: Optional[Namespace] = None):
6868
logger.info("Preparing exporter...")
6969
self.exporter = RayExporter(
7070
self.cfg.export_path,
71+
self.cfg.export_type,
7172
keep_stats_in_res_ds=self.cfg.keep_stats_in_res_ds,
7273
keep_hashes_in_res_ds=self.cfg.keep_hashes_in_res_ds,
74+
**self.cfg.export_extra_args,
7375
)
7476

7577
def run(self, load_data_np: Optional[PositiveInt] = None, skip_return=False):

data_juicer/core/exporter.py

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -18,13 +18,15 @@ class Exporter:
1818
def __init__(
1919
self,
2020
export_path,
21+
export_type=None,
2122
export_shard_size=0,
2223
export_in_parallel=True,
2324
num_proc=1,
2425
export_ds=True,
2526
keep_stats_in_res_ds=False,
2627
keep_hashes_in_res_ds=False,
2728
export_stats=True,
29+
**kwargs,
2830
):
2931
"""
3032
Initialization method.
@@ -48,7 +50,13 @@ def __init__(
4850
self.keep_stats_in_res_ds = keep_stats_in_res_ds
4951
self.keep_hashes_in_res_ds = keep_hashes_in_res_ds
5052
self.export_stats = export_stats
51-
self.suffix = self._get_suffix(export_path)
53+
self.suffix = self._get_suffix(export_path) if export_type is None else export_type
54+
support_dict = self._router()
55+
if self.suffix not in support_dict:
56+
raise NotImplementedError(
57+
f"Suffix of export path [{export_path}] or specified export_type [{export_type}] is not supported "
58+
f"for now. Only support {list(support_dict.keys())}."
59+
)
5260
self.num_proc = num_proc
5361
self.max_shard_size_str = ""
5462

@@ -90,14 +98,6 @@ def _get_suffix(self, export_path):
9098
:return: the suffix of export_path.
9199
"""
92100
suffix = export_path.split(".")[-1].lower()
93-
support_dict = self._router()
94-
if suffix not in support_dict:
95-
raise NotImplementedError(
96-
f"Suffix of export path ["
97-
f"{export_path}] is not supported "
98-
f"for now. Only support "
99-
f"{list(support_dict.keys())}."
100-
)
101101
return suffix
102102

103103
def _export_impl(self, dataset, export_path, suffix, export_stats=True):

data_juicer/core/ray_exporter.py

Lines changed: 80 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,10 @@
11
import os
2+
from functools import partial
23

34
from loguru import logger
45

56
from data_juicer.utils.constant import Fields, HashKeys
7+
from data_juicer.utils.webdataset_utils import reconstruct_custom_webdataset_format
68

79

810
class RayExporter:
@@ -22,7 +24,7 @@ class RayExporter:
2224
# 'numpy',
2325
}
2426

25-
def __init__(self, export_path, keep_stats_in_res_ds=True, keep_hashes_in_res_ds=False):
27+
def __init__(self, export_path, export_type=None, keep_stats_in_res_ds=True, keep_hashes_in_res_ds=False, **kwargs):
2628
"""
2729
Initialization method.
2830
@@ -35,7 +37,13 @@ def __init__(self, export_path, keep_stats_in_res_ds=True, keep_hashes_in_res_ds
3537
self.export_path = export_path
3638
self.keep_stats_in_res_ds = keep_stats_in_res_ds
3739
self.keep_hashes_in_res_ds = keep_hashes_in_res_ds
38-
self.export_format = self._get_export_format(export_path)
40+
self.export_format = self._get_export_format(export_path) if export_type is None else export_type
41+
if self.export_format not in self._SUPPORTED_FORMATS:
42+
raise NotImplementedError(
43+
f'export data format "{self.export_format}" is not supported '
44+
f"for now. Only support {self._SUPPORTED_FORMATS}. Please check export_type or export_path."
45+
)
46+
self.export_extra_args = kwargs if kwargs is not None else {}
3947

4048
def _get_export_format(self, export_path):
4149
"""
@@ -54,11 +62,6 @@ def _get_export_format(self, export_path):
5462
suffix = "jsonl"
5563

5664
export_format = suffix
57-
if export_format not in self._SUPPORTED_FORMATS:
58-
raise NotImplementedError(
59-
f'export data format "{export_format}" is not supported '
60-
f"for now. Only support {self._SUPPORTED_FORMATS}."
61-
)
6265
return export_format
6366

6467
def _export_impl(self, dataset, export_path, columns=None):
@@ -88,10 +91,12 @@ def _export_impl(self, dataset, export_path, columns=None):
8891
if len(removed_fields):
8992
dataset = dataset.drop_columns(removed_fields)
9093

91-
if self.export_format in {"json", "jsonl"}:
92-
return dataset.write_json(export_path, force_ascii=False)
93-
else:
94-
return getattr(dataset, f"write_{self.export_format}")(export_path)
94+
export_method = RayExporter._router()[self.export_format]
95+
export_kwargs = {
96+
"export_extra_args": self.export_extra_args,
97+
"export_format": self.export_format,
98+
}
99+
return export_method(dataset, export_path, **export_kwargs)
95100

96101
def export(self, dataset, columns=None):
97102
"""
@@ -102,3 +107,67 @@ def export(self, dataset, columns=None):
102107
:return:
103108
"""
104109
self._export_impl(dataset, self.export_path, columns)
110+
111+
@staticmethod
112+
def write_json(dataset, export_path, **kwargs):
113+
"""
114+
Export method for json/jsonl target files.
115+
116+
:param dataset: the dataset to export.
117+
:param export_path: the path to store the exported dataset.
118+
:param kwargs: extra arguments.
119+
:return:
120+
"""
121+
return dataset.write_json(export_path, force_ascii=False)
122+
123+
@staticmethod
124+
def write_webdataset(dataset, export_path, **kwargs):
125+
"""
126+
Export method for webdataset target files.
127+
128+
:param dataset: the dataset to export.
129+
:param export_path: the path to store the exported dataset.
130+
:param kwargs: extra arguments.
131+
:return:
132+
"""
133+
from data_juicer.utils.webdataset_utils import _custom_default_encoder
134+
135+
# check if we need to reconstruct the customized WebDataset format
136+
export_extra_args = kwargs.get("export_extra_args", {})
137+
field_mapping = export_extra_args.get("field_mapping", {})
138+
if len(field_mapping) > 0:
139+
reconstruct_func = partial(reconstruct_custom_webdataset_format, field_mapping=field_mapping)
140+
dataset = dataset.map(reconstruct_func)
141+
142+
return dataset.write_webdataset(export_path, encoder=_custom_default_encoder)
143+
144+
@staticmethod
145+
def write_others(dataset, export_path, **kwargs):
146+
"""
147+
Export method for other target files.
148+
149+
:param dataset: the dataset to export.
150+
:param export_path: the path to store the exported dataset.
151+
:param kwargs: extra arguments.
152+
:return:
153+
"""
154+
export_format = kwargs.get("export_format", "parquet")
155+
return getattr(dataset, f"write_{export_format}")(export_path)
156+
157+
# suffix to export method
158+
@staticmethod
159+
def _router():
160+
"""
161+
A router from different suffixes to corresponding export methods.
162+
163+
:return: A dict router.
164+
"""
165+
return {
166+
"jsonl": RayExporter.write_json,
167+
"json": RayExporter.write_json,
168+
"webdataset": RayExporter.write_webdataset,
169+
"parquet": RayExporter.write_others,
170+
"csv": RayExporter.write_others,
171+
"tfrecords": RayExporter.write_others,
172+
"lance": RayExporter.write_others,
173+
}

0 commit comments

Comments
 (0)