Skip to content

Commit 2ff81f6

Browse files
committed
Support string and integer ID property types
1 parent d1db769 commit 2ff81f6

File tree

9 files changed

+81
-20
lines changed

9 files changed

+81
-20
lines changed

README.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ python3 redisgraph_bulk_loader/bulk_insert.py GRAPHNAME [OPTIONS]
4949
| -R | --relations-with-type TEXT | Relationship Type followed by path to relationship CSV file |
5050
| -o | --separator CHAR | Field token separator in CSV files (default: comma) |
5151
| -d | --enforce-schema | Requires each cell to adhere to the schema defined in the CSV header |
52+
| -i | --id-type TEXT | The data type of unique node ID properties (either STRING or INTEGER) |
5253
| -s | --skip-invalid-nodes | Skip nodes that reuse previously defined IDs instead of exiting with an error |
5354
| -e | --skip-invalid-edges | Skip edges that use invalid IDs for endpoints instead of exiting with an error |
5455
| -q | --quote INT | The quoting format used in the CSV file. QUOTE_MINIMAL=0,QUOTE_ALL=1,QUOTE_NONNUMERIC=2,QUOTE_NONE=3 |
@@ -146,7 +147,7 @@ The accepted data types are:
146147
| STRING | A string value | Yes |
147148
| ARRAY | An array value | Yes |
148149

149-
If an `ID` column has a name string, the value will be added to each node as a property. Otherwise, it is internal to the bulk loader operation and will not appear in the graph. `START_ID` and `END_ID` columns will never be added as properties.
150+
If an `ID` column has a name string, the value will be added to each node as a property. This property will be a string by default, though it may be switched to integer using the `--id-type` argument. If the name string is not provided, the ID is internal to the bulk loader operation and will not appear in the graph. `START_ID` and `END_ID` columns will never be added as properties.
150151

151152
### ID Namespaces
152153
Typically, node identifiers need to be unique across all input CSVs. When using an input schema, it is (optionally) possible to create ID namespaces, and the identifier only needs to be unique across its namespace. This is particularly useful when each input CSV has primary keys which overlap with others.

redisgraph_bulk_loader/bulk_insert.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@ def process_entities(entities):
6060
@click.option('--separator', '-o', default=',', help='Field token separator in csv file')
6161
# Schema options
6262
@click.option('--enforce-schema', '-d', default=False, is_flag=True, help='Enforce the schema described in CSV header rows')
63+
@click.option('--id-type', '-i', default='STRING', help='The data type of unique node ID properties (either STRING or INTEGER)')
6364
@click.option('--skip-invalid-nodes', '-s', default=False, is_flag=True, help='ignore nodes that use previously defined IDs')
6465
@click.option('--skip-invalid-edges', '-e', default=False, is_flag=True, help='ignore invalid edges, print an error message and continue loading (True), or stop loading after an edge loading failure (False)')
6566
@click.option('--quote', '-q', default=0, help='the quoting format used in the CSV file. QUOTE_MINIMAL=0,QUOTE_ALL=1,QUOTE_NONNUMERIC=2,QUOTE_NONE=3')
@@ -70,7 +71,7 @@ def process_entities(entities):
7071
@click.option('--max-token-size', '-t', default=64, help='max size of each token in megabytes (default 64, max 512)')
7172
@click.option('--index', '-i', multiple=True, help='Label:Propery on which to create an index')
7273
@click.option('--full-text-index', '-f', multiple=True, help='Label:Propery on which to create an full text search index')
73-
def bulk_insert(graph, host, port, password, user, unix_socket_path, nodes, nodes_with_label, relations, relations_with_type, separator, enforce_schema, skip_invalid_nodes, skip_invalid_edges, escapechar, quote, max_token_count, max_buffer_size, max_token_size, index, full_text_index):
74+
def bulk_insert(graph, host, port, password, user, unix_socket_path, nodes, nodes_with_label, relations, relations_with_type, separator, enforce_schema, id_type, skip_invalid_nodes, skip_invalid_edges, escapechar, quote, max_token_count, max_buffer_size, max_token_size, index, full_text_index):
7475
if sys.version_info.major < 3 or sys.version_info.minor < 6:
7576
raise Exception("Python >= 3.6 is required for the RedisGraph bulk loader.")
7677

@@ -83,7 +84,7 @@ def bulk_insert(graph, host, port, password, user, unix_socket_path, nodes, node
8384
store_node_identifiers = any(relations) or any(relations_with_type)
8485

8586
# Initialize configurations with command-line arguments
86-
config = Config(max_token_count, max_buffer_size, max_token_size, enforce_schema, skip_invalid_nodes, skip_invalid_edges, separator, int(quote), store_node_identifiers, escapechar)
87+
config = Config(max_token_count, max_buffer_size, max_token_size, enforce_schema, id_type, skip_invalid_nodes, skip_invalid_edges, separator, int(quote), store_node_identifiers, escapechar)
8788

8889
# Attempt to connect to Redis server
8990
try:

redisgraph_bulk_loader/config.py

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,8 @@
1+
from exceptions import SchemaError
2+
3+
14
class Config:
2-
def __init__(self, max_token_count=1024 * 1023, max_buffer_size=64, max_token_size=64, enforce_schema=False, skip_invalid_nodes=False, skip_invalid_edges=False, separator=',', quoting=3, store_node_identifiers=False, escapechar='\\'):
5+
def __init__(self, max_token_count=1024 * 1023, max_buffer_size=64, max_token_size=64, enforce_schema=False, id_type='STRING', skip_invalid_nodes=False, skip_invalid_edges=False, separator=',', quoting=3, store_node_identifiers=False, escapechar='\\'):
36
"""Settings for this run of the bulk loader"""
47
# Maximum number of tokens per query
58
# 1024 * 1024 is the hard-coded Redis maximum. We'll set a slightly lower limit so
@@ -13,6 +16,10 @@ def __init__(self, max_token_count=1024 * 1023, max_buffer_size=64, max_token_si
1316
self.max_token_size = min(max_token_size * 1_000_000, 512 * 1_000_000, self.max_buffer_size)
1417

1518
self.enforce_schema = enforce_schema
19+
id_type = str.upper(id_type)
20+
if id_type != 'STRING' and id_type != 'INTEGER':
21+
raise SchemaError("Specified invalid argument for --id-type, expected STRING or INTEGER")
22+
self.id_type = id_type
1623
self.skip_invalid_nodes = skip_invalid_nodes
1724
self.skip_invalid_edges = skip_invalid_edges
1825
self.separator = separator

redisgraph_bulk_loader/entity_file.py

Lines changed: 15 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -21,10 +21,11 @@ class Type(Enum):
2121
INT = 4 # alias to LONG
2222
INTEGER = 4 # alias to LONG
2323
ARRAY = 5
24-
ID = 6
25-
START_ID = 7
26-
END_ID = 8
27-
IGNORE = 9
24+
ID_STRING = 6
25+
ID_INTEGER = 7
26+
START_ID = 8
27+
END_ID = 9
28+
IGNORE = 10
2829

2930

3031
def convert_schema_type(in_type):
@@ -33,8 +34,8 @@ def convert_schema_type(in_type):
3334
except KeyError:
3435
# Handling for ID namespaces
3536
# TODO think of better alternatives
36-
if in_type.startswith('ID('):
37-
return Type.ID
37+
if in_type.startswith('ID'):
38+
return Type.ID_STRING
3839
elif in_type.startswith('START_ID('):
3940
return Type.START_ID
4041
elif in_type.startswith('END_ID('):
@@ -70,8 +71,7 @@ def typed_prop_to_binary(prop_val, prop_type):
7071
# TODO This is not allowed in Cypher, consider how to handle it here rather than in-module.
7172
return struct.pack(format_str, 0)
7273

73-
# TODO allow ID type specification
74-
if prop_type == Type.LONG:
74+
if prop_type == Type.ID_INTEGER or prop_type == Type.LONG:
7575
try:
7676
numeric_prop = int(prop_val)
7777
return struct.pack(format_str + "q", Type.LONG.value, numeric_prop)
@@ -99,7 +99,7 @@ def typed_prop_to_binary(prop_val, prop_type):
9999
else:
100100
raise SchemaError("Could not parse '%s' as a boolean" % prop_val)
101101

102-
elif prop_type == Type.ID or prop_type == Type.STRING:
102+
elif prop_type == Type.ID_STRING or prop_type == Type.STRING:
103103
# If we've reached this point, the property is a string
104104
encoded_str = str.encode(prop_val) # struct.pack requires bytes objects as arguments
105105
# Encoding len+1 adds a null terminator to the string
@@ -112,7 +112,7 @@ def typed_prop_to_binary(prop_val, prop_type):
112112
return array_prop_to_binary(format_str, prop_val)
113113

114114
# If it hasn't returned by this point, it is trying to set it to a type that it can't adopt
115-
raise SchemaError("unable to parse [" + prop_val + "] with type ["+repr(prop_type)+"]")
115+
raise SchemaError("unable to parse [" + prop_val + "] with type [" + repr(prop_type) + "]")
116116

117117

118118
# Convert a single CSV property field with an inferred type into a binary stream.
@@ -238,7 +238,7 @@ def convert_header_with_schema(self, header):
238238
col_type = convert_schema_type(pair[1].upper().strip())
239239

240240
# If the column did not have a name but the type requires one, emit an error.
241-
if len(pair[0]) == 0 and col_type not in (Type.ID, Type.START_ID, Type.END_ID, Type.IGNORE):
241+
if len(pair[0]) == 0 and col_type not in (Type.ID_STRING, Type.ID_INTEGER, Type.START_ID, Type.END_ID, Type.IGNORE):
242242
raise SchemaError("%s: Each property in the header should be a colon-separated pair" % (self.infile.name))
243243
else:
244244
# We have a column name and a type.
@@ -247,6 +247,10 @@ def convert_header_with_schema(self, header):
247247
column_name = pair[0].strip()
248248
self.column_names[idx] = column_name
249249

250+
# ID types may be parsed as strings or integers depending on user specification.
251+
if col_type == Type.ID_STRING and self.config.id_type == 'INTEGER':
252+
col_type = Type.ID_INTEGER
253+
250254
# Store the column type.
251255
self.types[idx] = col_type
252256

redisgraph_bulk_loader/label.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,10 +30,14 @@ def post_process_header_with_schema(self, header):
3030
return
3131

3232
# Verify that exactly one field is labeled ID.
33-
if self.types.count(Type.ID) != 1:
33+
if (self.types.count(Type.ID_STRING) + self.types.count(Type.ID_INTEGER)) != 1:
3434
raise SchemaError("Node file '%s' should have exactly one ID column."
3535
% (self.infile.name))
36-
self.id = self.types.index(Type.ID) # Track the offset containing the node ID.
36+
# Track the offset containing the node ID.
37+
try:
38+
self.id = self.types.index(Type.ID_STRING)
39+
except ValueError:
40+
self.id = self.types.index(Type.ID_INTEGER)
3741
id_field = header[self.id]
3842
# If the ID field specifies an ID namespace in parentheses like "val:ID(NAMESPACE)", capture the namespace.
3943
match = re.search(r"\((\w+)\)", id_field)

redisgraph_bulk_loader/relation_type.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ def process_schemaless_header(self, header):
2222
self.end_namespace = None
2323

2424
for idx, field in enumerate(header[2:]):
25-
self.column_names[idx+2] = field.strip()
25+
self.column_names[idx + 2] = field.strip()
2626

2727
def post_process_header_with_schema(self, header):
2828
# Can interleave these tasks if preferred.

test/test_bulk_loader.py

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -714,6 +714,48 @@ def test18_ensure_full_text_index_is_created(self):
714714
# We should find only the tamarins
715715
self.assertEqual(query_result.result_set, expected_result)
716716

717+
def test19_integer_ids(self):
718+
"""Validate that IDs can be persisted as integers."""
719+
720+
graphname = "id_integer_graph"
721+
with open('/tmp/nodes.tmp', mode='w') as csv_file:
722+
out = csv.writer(csv_file)
723+
out.writerow(['id:ID(User)', 'name:STRING'])
724+
out.writerow([0, 'Jeffrey'])
725+
out.writerow([1, 'Filipe'])
726+
727+
with open('/tmp/nodes2.tmp', mode='w') as csv_file:
728+
out = csv.writer(csv_file)
729+
out.writerow(['id:ID(Post)', 'views:INT'])
730+
out.writerow([0, 20])
731+
out.writerow([1, 40])
732+
733+
with open('/tmp/relations.tmp', mode='w') as csv_file:
734+
out = csv.writer(csv_file)
735+
out.writerow([':START_ID(User)', ':END_ID(Post)'])
736+
out.writerow([0, 0])
737+
out.writerow([1, 1])
738+
739+
runner = CliRunner()
740+
res = runner.invoke(bulk_insert, ['--nodes-with-label', 'User', '/tmp/nodes.tmp',
741+
'--nodes-with-label', 'Post', '/tmp/nodes2.tmp',
742+
'--relations-with-type', 'AUTHOR', '/tmp/relations.tmp',
743+
'--enforce-schema',
744+
'--id-type', 'integer',
745+
graphname], catch_exceptions=False)
746+
747+
self.assertEqual(res.exit_code, 0)
748+
self.assertIn('4 nodes created', res.output)
749+
self.assertIn("2 relations created", res.output)
750+
751+
graph = Graph(graphname, self.redis_con)
752+
query_result = graph.query('MATCH (src)-[]->(dest) RETURN src.id, src.name, LABELS(src), dest.id, dest.views, LABELS(dest) ORDER BY src.id')
753+
754+
# The IDs of the results should be parsed as integers
755+
expected_result = [[0, 'Jeffrey', 'User', 0, 20, 'Post'],
756+
[1, 'Filipe', 'User', 1, 40, 'Post']]
757+
self.assertEqual(query_result.result_set, expected_result)
758+
717759

718760
if __name__ == '__main__':
719761
unittest.main()

test/test_config.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ def test01_default_values(self):
1010
self.assertEqual(config.max_buffer_size, 64_000_000)
1111
self.assertEqual(config.max_token_size, 64_000_000)
1212
self.assertEqual(config.enforce_schema, False)
13+
self.assertEqual(config.id_type, 'STRING')
1314
self.assertEqual(config.skip_invalid_nodes, False)
1415
self.assertEqual(config.skip_invalid_edges, False)
1516
self.assertEqual(config.store_node_identifiers, False)
@@ -18,11 +19,12 @@ def test01_default_values(self):
1819

1920
def test02_modified_values(self):
2021
"""Verify that Config_set updates Config class values accordingly."""
21-
config = Config(max_token_count=10, max_buffer_size=500, max_token_size=200, enforce_schema=True, skip_invalid_nodes=True, skip_invalid_edges=True, separator='|', quoting=0)
22+
config = Config(max_token_count=10, max_buffer_size=500, max_token_size=200, enforce_schema=True, id_type='INTEGER', skip_invalid_nodes=True, skip_invalid_edges=True, separator='|', quoting=0)
2223
self.assertEqual(config.max_token_count, 10)
2324
self.assertEqual(config.max_token_size, 200_000_000) # Max token size argument is converted to megabytes
2425
self.assertEqual(config.max_buffer_size, 500_000_000) # Buffer size argument is converted to megabytes
2526
self.assertEqual(config.enforce_schema, True)
27+
self.assertEqual(config.id_type, 'INTEGER')
2628
self.assertEqual(config.skip_invalid_nodes, True)
2729
self.assertEqual(config.skip_invalid_edges, True)
2830
self.assertEqual(config.store_node_identifiers, False)

test/test_label.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,5 +46,5 @@ def test02_process_header_with_schema(self):
4646
self.assertEqual(label.entity_str, 'LabelTest')
4747
self.assertEqual(label.prop_count, 2)
4848
self.assertEqual(label.entities_count, 2)
49-
self.assertEqual(label.types[0].name, 'ID')
49+
self.assertEqual(label.types[0].name, 'ID_STRING')
5050
self.assertEqual(label.types[1].name, 'STRING')

0 commit comments

Comments
 (0)