Skip to content

Commit a4b9305

Browse files
add chunk serializer & tests (RDFLib#1968)
This file provides a single function `serialize_in_chunks()` which can serialize a Graph into a number of NT files with a maximum number of triples or maximum file size. There is an option to preserve any prefixes declared for the original graph in the first file, which will be a Turtle file. Co-authored-by: Iwan Aucamp <aucampia@gmail.com>
1 parent 131d9e6 commit a4b9305

File tree

5 files changed

+362
-10
lines changed

5 files changed

+362
-10
lines changed

CHANGELOG.md

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,23 @@ CHANGE BARRIER is intended to reduce the potential for merge conflicts
2121
and will be removed for release.
2222
-->
2323

24+
25+
<!-- -->
26+
<!-- -->
27+
<!-- CHANGE BARRIER: START -->
28+
<!-- -->
29+
<!-- -->
30+
31+
- Add chunk serializer that facilitates the encoding of a graph into multiple
32+
N-Triples encoded chunks.
33+
[PR #1968](https://github.com/RDFLib/rdflib/pull/1968).
34+
35+
<!-- -->
36+
<!-- -->
37+
<!-- CHANGE BARRIER: END -->
38+
<!-- -->
39+
<!-- -->
40+
2441
<!-- -->
2542
<!-- -->
2643
<!-- CHANGE BARRIER: START -->

rdflib/plugins/serializers/nt.py

Lines changed: 23 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,21 @@
1+
from __future__ import annotations
2+
13
"""
24
N-Triples RDF graph serializer for RDFLib.
35
See <http://www.w3.org/TR/rdf-testcases/#ntriples> for details about the
46
format.
57
"""
68
import codecs
79
import warnings
8-
from typing import IO, Optional
10+
from typing import IO, TYPE_CHECKING, Optional, Tuple, Union
911

1012
from rdflib.graph import Graph
1113
from rdflib.serializer import Serializer
1214
from rdflib.term import Literal
1315

16+
if TYPE_CHECKING:
17+
from rdflib.graph import _TripleType
18+
1419
__all__ = ["NTSerializer"]
1520

1621

@@ -52,18 +57,20 @@ def __init__(self, store: Graph):
5257
Serializer.__init__(self, store) # default to utf-8
5358

5459

55-
def _nt_row(triple):
60+
def _nt_row(triple: _TripleType):
5661
if isinstance(triple[2], Literal):
5762
return "%s %s %s .\n" % (
58-
triple[0].n3(),
59-
triple[1].n3(),
63+
# type error: "Node" has no attribute "n3"
64+
triple[0].n3(), # type: ignore[attr-defined]
65+
triple[1].n3(), # type: ignore[attr-defined]
6066
_quoteLiteral(triple[2]),
6167
)
6268
else:
63-
return "%s %s %s .\n" % (triple[0].n3(), triple[1].n3(), triple[2].n3())
69+
# type error: "Node" has no attribute "n3"
70+
return "%s %s %s .\n" % (triple[0].n3(), triple[1].n3(), triple[2].n3()) # type: ignore[attr-defined]
6471

6572

66-
def _quoteLiteral(l_):
73+
def _quoteLiteral(l_: Literal) -> str: # noqa: N802
6774
"""
6875
a simpler version of term.Literal.n3()
6976
"""
@@ -80,13 +87,15 @@ def _quoteLiteral(l_):
8087
return "%s" % encoded
8188

8289

83-
def _quote_encode(l_):
90+
def _quote_encode(l_: str) -> str:
8491
return '"%s"' % l_.replace("\\", "\\\\").replace("\n", "\\n").replace(
8592
'"', '\\"'
8693
).replace("\r", "\\r")
8794

8895

89-
def _nt_unicode_error_resolver(err):
96+
def _nt_unicode_error_resolver(
97+
err: UnicodeError,
98+
) -> Tuple[Union[str, bytes], int]:
9099
"""
91100
Do unicode char replaces as defined in https://www.w3.org/TR/2004/REC-rdf-testcases-20040210/#ntrip_strings
92101
"""
@@ -96,8 +105,12 @@ def _replace_single(c):
96105
fmt = "\\u%04X" if c <= 0xFFFF else "\\U%08X"
97106
return fmt % c
98107

99-
string = err.object[err.start : err.end]
100-
return "".join(_replace_single(c) for c in string), err.end
108+
# type error: "UnicodeError" has no attribute "object"
109+
# type error: "UnicodeError" has no attribute "start"
110+
# type error: "UnicodeError" has no attribute "end"
111+
string = err.object[err.start : err.end] # type: ignore[attr-defined]
112+
# type error: "UnicodeError" has no attribute "end"
113+
return "".join(_replace_single(c) for c in string), err.end # type: ignore[attr-defined]
101114

102115

103116
codecs.register_error("_rdflib_nt_escape", _nt_unicode_error_resolver)

rdflib/tools/chunk_serializer.py

Lines changed: 132 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,132 @@
1+
"""
2+
This file provides a single function `serialize_in_chunks()` which can serialize a
3+
Graph into a number of NT files with a maximum number of triples or maximum file size.
4+
5+
There is an option to preserve any prefixes declared for the original graph in the first
6+
file, which will be a Turtle file.
7+
"""
8+
9+
from contextlib import ExitStack, contextmanager
10+
from pathlib import Path
11+
from typing import TYPE_CHECKING, BinaryIO, Generator, Optional, Tuple
12+
13+
from rdflib.graph import Graph
14+
from rdflib.plugins.serializers.nt import _nt_row
15+
16+
# from rdflib.term import Literal
17+
18+
# if TYPE_CHECKING:
19+
# from rdflib.graph import _TriplePatternType
20+
21+
__all__ = ["serialize_in_chunks"]
22+
23+
24+
def serialize_in_chunks(
25+
g: Graph,
26+
max_triples: int = 10000,
27+
max_file_size_kb: Optional[int] = None,
28+
file_name_stem: str = "chunk",
29+
output_dir: Optional[Path] = None,
30+
write_prefixes: bool = False,
31+
) -> None:
32+
"""
33+
Serializes a given Graph into a series of n-triples with a given length.
34+
35+
:param g:
36+
The graph to serialize.
37+
38+
:param max_file_size_kb:
39+
Maximum size per NT file in kB (1,000 bytes)
40+
Equivalent to ~6,000 triples, depending on Literal sizes.
41+
42+
:param max_triples:
43+
Maximum size per NT file in triples
44+
Equivalent to lines in file.
45+
46+
If both this parameter and max_file_size_kb are set, max_file_size_kb will be used.
47+
48+
:param file_name_stem:
49+
Prefix of each file name.
50+
e.g. "chunk" = chunk_000001.nt, chunk_000002.nt...
51+
52+
:param output_dir:
53+
The directory you want the files to be written to.
54+
55+
:param write_prefixes:
56+
The first file created is a Turtle file containing original graph prefixes.
57+
58+
59+
See ``../test/test_tools/test_chunk_serializer.py`` for examples of this in use.
60+
"""
61+
62+
if output_dir is None:
63+
output_dir = Path.cwd()
64+
65+
if not output_dir.is_dir():
66+
raise ValueError(
67+
"If you specify an output_dir, it must actually be a directory!"
68+
)
69+
70+
@contextmanager
71+
def _start_new_file(file_no: int) -> Generator[Tuple[Path, BinaryIO], None, None]:
72+
if TYPE_CHECKING:
73+
# this is here because mypy gets a bit confused
74+
assert output_dir is not None
75+
fp = Path(output_dir) / f"{file_name_stem}_{str(file_no).zfill(6)}.nt"
76+
with open(fp, "wb") as fh:
77+
yield fp, fh
78+
79+
def _serialize_prefixes(g: Graph) -> str:
80+
pres = []
81+
for k, v in g.namespace_manager.namespaces():
82+
pres.append(f"PREFIX {k}: <{v}>")
83+
84+
return "\n".join(sorted(pres)) + "\n"
85+
86+
if write_prefixes:
87+
with open(
88+
Path(output_dir) / f"{file_name_stem}_000000.ttl", "w", encoding="utf-8"
89+
) as fh:
90+
fh.write(_serialize_prefixes(g))
91+
92+
bytes_written = 0
93+
with ExitStack() as xstack:
94+
if max_file_size_kb is not None:
95+
max_file_size = max_file_size_kb * 1000
96+
file_no = 1 if write_prefixes else 0
97+
for i, t in enumerate(g.triples((None, None, None))):
98+
row_bytes = _nt_row(t).encode("utf-8")
99+
if len(row_bytes) > max_file_size:
100+
raise ValueError(
101+
f"cannot write triple {t!r} as it's serialized size of {row_bytes / 1000} exceeds max_file_size_kb = {max_file_size_kb}"
102+
)
103+
if i == 0:
104+
fp, fhb = xstack.enter_context(_start_new_file(file_no))
105+
bytes_written = 0
106+
elif (bytes_written + len(row_bytes)) >= max_file_size:
107+
file_no += 1
108+
fp, fhb = xstack.enter_context(_start_new_file(file_no))
109+
bytes_written = 0
110+
111+
bytes_written += fhb.write(row_bytes)
112+
113+
else:
114+
# count the triples in the graph
115+
graph_length = len(g)
116+
117+
if graph_length <= max_triples:
118+
# the graph is less than max so just NT serialize the whole thing
119+
g.serialize(
120+
destination=Path(output_dir) / f"{file_name_stem}_all.nt",
121+
format="nt",
122+
)
123+
else:
124+
# graph_length is > max_lines, make enough files for all graph
125+
# no_files = math.ceil(graph_length / max_triples)
126+
file_no = 1 if write_prefixes else 0
127+
for i, t in enumerate(g.triples((None, None, None))):
128+
if i % max_triples == 0:
129+
fp, fhb = xstack.enter_context(_start_new_file(file_no))
130+
file_no += 1
131+
fhb.write(_nt_row(t).encode("utf-8"))
132+
return

0 commit comments

Comments
 (0)