Skip to content

Commit 4c1336b

Browse files
author
Julien Ruaux
committed
feat: Added support for DELETE statements
1 parent 029214b commit 4c1336b

File tree

9 files changed

+191
-67
lines changed

9 files changed

+191
-67
lines changed

pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@
2626
<github.owner>redis-field-engineering</github.owner>
2727
<github.repo>redis-sql</github.repo>
2828

29-
<lettucemod.version>3.1.6</lettucemod.version>
29+
<lettucemod.version>3.2.0</lettucemod.version>
3030
<lettuce.version>6.2.2.RELEASE</lettuce.version>
3131
<testcontainers-redis.version>1.6.2</testcontainers-redis.version>
3232
<ulid.version>5.1.0</ulid.version>

src/main/java/com/redis/trino/RediSearchMetadata.java

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,8 @@
2929
import static com.google.common.collect.ImmutableSet.toImmutableSet;
3030
import static io.airlift.slice.SliceUtf8.getCodePointAt;
3131
import static io.trino.spi.StandardErrorCode.INVALID_FUNCTION_ARGUMENT;
32+
import static io.trino.spi.StandardErrorCode.NOT_SUPPORTED;
33+
import static io.trino.spi.connector.RetryMode.NO_RETRIES;
3234
import static io.trino.spi.expression.StandardFunctions.LIKE_FUNCTION_NAME;
3335
import static java.util.Objects.requireNonNull;
3436

@@ -244,6 +246,25 @@ public Optional<ConnectorOutputMetadata> finishInsert(ConnectorSession session,
244246
return Optional.empty();
245247
}
246248

249+
@Override
250+
public RediSearchColumnHandle getDeleteRowIdColumnHandle(ConnectorSession session,
251+
ConnectorTableHandle tableHandle) {
252+
return RediSearchBuiltinField.ID.getColumnHandle();
253+
}
254+
255+
@Override
256+
public RediSearchTableHandle beginDelete(ConnectorSession session, ConnectorTableHandle tableHandle,
257+
RetryMode retryMode) {
258+
if (retryMode != NO_RETRIES) {
259+
throw new TrinoException(NOT_SUPPORTED, "This connector does not support query retries");
260+
}
261+
return (RediSearchTableHandle) tableHandle;
262+
}
263+
264+
@Override
265+
public void finishDelete(ConnectorSession session, ConnectorTableHandle tableHandle, Collection<Slice> fragments) {
266+
}
267+
247268
@Override
248269
public ConnectorTableProperties getTableProperties(ConnectorSession session, ConnectorTableHandle table) {
249270
RediSearchTableHandle handle = (RediSearchTableHandle) table;

src/main/java/com/redis/trino/RediSearchPageSink.java

Lines changed: 34 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@
4343
import java.util.HashMap;
4444
import java.util.List;
4545
import java.util.Map;
46+
import java.util.Optional;
4647
import java.util.Random;
4748
import java.util.concurrent.CompletableFuture;
4849

@@ -51,6 +52,9 @@
5152
import com.google.common.primitives.Shorts;
5253
import com.google.common.primitives.SignedBytes;
5354
import com.redis.lettucemod.api.StatefulRedisModulesConnection;
55+
import com.redis.lettucemod.search.CreateOptions;
56+
import com.redis.lettucemod.search.CreateOptions.DataType;
57+
import com.redis.lettucemod.search.IndexInfo;
5458

5559
import io.airlift.slice.Slice;
5660
import io.lettuce.core.LettuceFutures;
@@ -78,26 +82,27 @@
7882

7983
public class RediSearchPageSink implements ConnectorPageSink {
8084

81-
private final RediSearchSession rediSearchSession;
85+
private final RediSearchSession session;
8286
private final SchemaTableName schemaTableName;
8387
private final List<RediSearchColumnHandle> columns;
8488
private final UlidFactory factory = UlidFactory.newInstance(new Random());
8589

8690
public RediSearchPageSink(RediSearchSession rediSearchSession, SchemaTableName schemaTableName,
8791
List<RediSearchColumnHandle> columns) {
88-
this.rediSearchSession = rediSearchSession;
92+
this.session = rediSearchSession;
8993
this.schemaTableName = schemaTableName;
9094
this.columns = columns;
9195
}
9296

9397
@Override
9498
public CompletableFuture<?> appendPage(Page page) {
95-
StatefulRedisModulesConnection<String, String> connection = rediSearchSession.getConnection();
99+
String prefix = prefix().orElse(schemaTableName.getTableName());
100+
StatefulRedisModulesConnection<String, String> connection = session.getConnection();
96101
connection.setAutoFlushCommands(false);
97102
List<RedisFuture<?>> futures = new ArrayList<>();
98103
for (int position = 0; position < page.getPositionCount(); position++) {
99104
Map<String, String> map = new HashMap<>();
100-
String key = schemaTableName.getTableName() + ":" + factory.create().toString();
105+
String key = prefix + ":" + factory.create().toString();
101106
for (int channel = 0; channel < page.getChannelCount(); channel++) {
102107
RediSearchColumnHandle column = columns.get(channel);
103108
Block block = page.getBlock(channel);
@@ -115,6 +120,31 @@ public CompletableFuture<?> appendPage(Page page) {
115120
return NOT_BLOCKED;
116121
}
117122

123+
private Optional<String> prefix() {
124+
try {
125+
RediSearchTable table = session.getTable(schemaTableName);
126+
IndexInfo indexInfo = table.getIndexInfo();
127+
CreateOptions<String, String> options = indexInfo.getIndexOptions();
128+
Optional<DataType> on = options.getOn();
129+
if (on.isEmpty() || on.get() != DataType.HASH) {
130+
return Optional.empty();
131+
}
132+
if (options.getPrefixes().isEmpty()) {
133+
return Optional.empty();
134+
}
135+
String prefix = options.getPrefixes().get(0);
136+
if (prefix.equals("*")) {
137+
return Optional.empty();
138+
}
139+
if (prefix.endsWith(":")) {
140+
return Optional.of(prefix.substring(0, prefix.length() - 1));
141+
}
142+
return Optional.of(prefix);
143+
} catch (Exception e) {
144+
return Optional.empty();
145+
}
146+
}
147+
118148
private String getObjectValue(Type type, Block block, int position) {
119149
if (type.equals(BooleanType.BOOLEAN)) {
120150
return String.valueOf(type.getBoolean(block, position));

src/main/java/com/redis/trino/RediSearchPageSource.java

Lines changed: 37 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -27,40 +27,49 @@
2727

2828
import java.io.IOException;
2929
import java.io.OutputStream;
30+
import java.util.ArrayList;
31+
import java.util.Collection;
32+
import java.util.Collections;
3033
import java.util.Iterator;
3134
import java.util.List;
35+
import java.util.concurrent.CompletableFuture;
3236
import java.util.stream.Collectors;
3337

3438
import com.fasterxml.jackson.core.JsonFactory;
3539
import com.fasterxml.jackson.core.JsonGenerator;
3640
import com.redis.lettucemod.search.Document;
3741

42+
import io.airlift.slice.Slice;
3843
import io.airlift.slice.SliceOutput;
3944
import io.trino.spi.Page;
4045
import io.trino.spi.PageBuilder;
46+
import io.trino.spi.block.Block;
4147
import io.trino.spi.block.BlockBuilder;
42-
import io.trino.spi.connector.ConnectorPageSource;
48+
import io.trino.spi.connector.UpdatablePageSource;
4349
import io.trino.spi.type.Type;
4450

45-
public class RediSearchPageSource implements ConnectorPageSource {
51+
public class RediSearchPageSource implements UpdatablePageSource {
4652

4753
private static final int ROWS_PER_REQUEST = 1024;
4854

4955
private final RediSearchPageSourceResultWriter writer = new RediSearchPageSourceResultWriter();
56+
private final RediSearchSession session;
5057
private final Iterator<Document<String, String>> cursor;
51-
private final List<String> columnNames;
58+
private final String[] columnNames;
5259
private final List<Type> columnTypes;
60+
private final PageBuilder pageBuilder;
61+
5362
private Document<String, String> currentDoc;
5463
private long count;
5564
private boolean finished;
5665

57-
private final PageBuilder pageBuilder;
58-
59-
public RediSearchPageSource(RediSearchSession rediSearchSession, RediSearchTableHandle tableHandle,
66+
public RediSearchPageSource(RediSearchSession session, RediSearchTableHandle table,
6067
List<RediSearchColumnHandle> columns) {
61-
this.columnNames = columns.stream().map(RediSearchColumnHandle::getName).collect(Collectors.toUnmodifiableList());
62-
this.columnTypes = columns.stream().map(RediSearchColumnHandle::getType).collect(Collectors.toUnmodifiableList());
63-
this.cursor = rediSearchSession.search(tableHandle, columns).iterator();
68+
this.session = session;
69+
this.columnNames = columns.stream().map(RediSearchColumnHandle::getName).toArray(String[]::new);
70+
this.columnTypes = columns.stream().map(RediSearchColumnHandle::getType)
71+
.collect(Collectors.toUnmodifiableList());
72+
this.cursor = session.search(table, columnNames).iterator();
6473
this.currentDoc = null;
6574
this.pageBuilder = new PageBuilder(columnTypes);
6675
}
@@ -100,7 +109,7 @@ public Page getNextPage() {
100109
pageBuilder.declarePosition();
101110
for (int column = 0; column < columnTypes.size(); column++) {
102111
BlockBuilder output = pageBuilder.getBlockBuilder(column);
103-
String columnName = columnNames.get(column);
112+
String columnName = columnNames[column];
104113
String value = currentValue(columnName);
105114
if (value == null) {
106115
output.appendNull();
@@ -115,6 +124,17 @@ public Page getNextPage() {
115124
return page;
116125
}
117126

127+
@Override
128+
public void deleteRows(Block rowIds) {
129+
List<String> docIds = new ArrayList<>(rowIds.getPositionCount());
130+
for (int i = 0; i < rowIds.getPositionCount(); i++) {
131+
int len = rowIds.getSliceLength(i);
132+
Slice slice = rowIds.getSlice(i, 0, len);
133+
docIds.add(slice.toStringUtf8());
134+
}
135+
session.deleteDocs(docIds);
136+
}
137+
118138
private String currentValue(String columnName) {
119139
if (RediSearchBuiltinField.isBuiltinColumn(columnName)) {
120140
if (RediSearchBuiltinField.ID.getName().equals(columnName)) {
@@ -127,6 +147,13 @@ private String currentValue(String columnName) {
127147
return currentDoc.get(columnName);
128148
}
129149

150+
@Override
151+
public CompletableFuture<Collection<Slice>> finish() {
152+
CompletableFuture<Collection<Slice>> future = new CompletableFuture<>();
153+
future.complete(Collections.emptyList());
154+
return future;
155+
}
156+
130157
public static JsonGenerator createJsonGenerator(JsonFactory factory, SliceOutput output) throws IOException {
131158
return factory.createGenerator((OutputStream) output);
132159
}

src/main/java/com/redis/trino/RediSearchQueryBuilder.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -242,9 +242,9 @@ public Optional<Group> group(RediSearchTableHandle table) {
242242
List<RediSearchAggregation> aggregates = table.getMetricAggregations();
243243
List<String> groupFields = new ArrayList<>();
244244
if (terms != null && !terms.isEmpty()) {
245-
groupFields = terms.stream().map(RediSearchAggregationTerm::getTerm).collect(Collectors.toList());
245+
groupFields = terms.stream().map(RediSearchAggregationTerm::getTerm).collect(Collectors.toUnmodifiableList());
246246
}
247-
List<Reducer> reducers = aggregates.stream().map(this::reducer).collect(Collectors.toList());
247+
List<Reducer> reducers = aggregates.stream().map(this::reducer).collect(Collectors.toUnmodifiableList());
248248
if (reducers.isEmpty()) {
249249
return Optional.empty();
250250
}

src/main/java/com/redis/trino/RediSearchSession.java

Lines changed: 52 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -34,8 +34,10 @@
3434

3535
import java.io.File;
3636
import java.util.Collections;
37+
import java.util.HashMap;
3738
import java.util.HashSet;
3839
import java.util.List;
40+
import java.util.Map;
3941
import java.util.Optional;
4042
import java.util.Set;
4143
import java.util.concurrent.TimeUnit;
@@ -48,10 +50,12 @@
4850
import com.google.common.collect.ImmutableSet;
4951
import com.google.common.util.concurrent.UncheckedExecutionException;
5052
import com.redis.lettucemod.api.StatefulRedisModulesConnection;
53+
import com.redis.lettucemod.search.AggregateOperation;
5154
import com.redis.lettucemod.search.AggregateWithCursorResults;
5255
import com.redis.lettucemod.search.CreateOptions;
5356
import com.redis.lettucemod.search.Document;
5457
import com.redis.lettucemod.search.Field;
58+
import com.redis.lettucemod.search.Group;
5559
import com.redis.lettucemod.search.IndexInfo;
5660
import com.redis.lettucemod.search.SearchResults;
5761
import com.redis.lettucemod.util.ClientBuilder;
@@ -161,6 +165,12 @@ public Set<String> getAllTables() throws SchemaNotFoundException {
161165
return builder.build();
162166
}
163167

168+
/**
169+
*
170+
* @param schemaTableName SchemaTableName to load
171+
* @return RediSearchTable describing the RediSearch index
172+
* @throws TableNotFoundException if no index by that name was found
173+
*/
164174
public RediSearchTable getTable(SchemaTableName tableName) throws TableNotFoundException {
165175
try {
166176
return tableCache.getUnchecked(tableName);
@@ -172,13 +182,13 @@ public RediSearchTable getTable(SchemaTableName tableName) throws TableNotFoundE
172182

173183
@SuppressWarnings("unchecked")
174184
public void createTable(SchemaTableName schemaTableName, List<RediSearchColumnHandle> columns) {
175-
String tableName = schemaTableName.getTableName();
176-
if (!connection.sync().ftList().contains(tableName)) {
185+
String index = index(schemaTableName);
186+
if (!connection.sync().ftList().contains(index)) {
177187
List<Field<String>> fields = columns.stream().filter(c -> !c.getName().equals("_id"))
178-
.map(c -> buildField(c.getName(), c.getType())).collect(Collectors.toList());
188+
.map(c -> buildField(c.getName(), c.getType())).collect(Collectors.toUnmodifiableList());
179189
CreateOptions.Builder<String, String> options = CreateOptions.<String, String>builder();
180-
options.prefix(tableName + ":");
181-
connection.sync().ftCreate(tableName, options.build(), fields.toArray(Field[]::new));
190+
options.prefix(index + ":");
191+
connection.sync().ftCreate(index, options.build(), fields.toArray(Field[]::new));
182192
}
183193
}
184194

@@ -210,19 +220,26 @@ public void dropColumn(SchemaTableName schemaTableName, String columnName) {
210220
throw new TrinoException(NOT_SUPPORTED, "This connector does not support dropping columns");
211221
}
212222

213-
private RediSearchTable loadTableSchema(SchemaTableName schemaTableName) {
223+
/**
224+
*
225+
* @param schemaTableName SchemaTableName to load
226+
* @return RediSearchTable describing the RediSearch index
227+
* @throws TableNotFoundException if no index by that name was found
228+
*/
229+
private RediSearchTable loadTableSchema(SchemaTableName schemaTableName) throws TableNotFoundException {
214230
String index = schemaTableName.getTableName();
215-
Optional<IndexInfo> indexInfo = indexInfo(index);
216-
if (indexInfo.isEmpty()) {
231+
Optional<IndexInfo> indexInfoOptional = indexInfo(index);
232+
if (indexInfoOptional.isEmpty()) {
217233
throw new TableNotFoundException(schemaTableName, format("Index '%s' not found", index), null);
218234
}
235+
IndexInfo indexInfo = indexInfoOptional.get();
219236
Set<String> fields = new HashSet<>();
220237
ImmutableList.Builder<RediSearchColumnHandle> columns = ImmutableList.builder();
221238
for (RediSearchBuiltinField builtinfield : RediSearchBuiltinField.values()) {
222239
fields.add(builtinfield.getName());
223240
columns.add(builtinfield.getColumnHandle());
224241
}
225-
for (Field<String> indexedField : indexInfo.get().getFields()) {
242+
for (Field<String> indexedField : indexInfo.getFields()) {
226243
RediSearchColumnHandle column = buildColumnHandle(indexedField);
227244
fields.add(column.getName());
228245
columns.add(column);
@@ -237,8 +254,9 @@ private RediSearchTable loadTableSchema(SchemaTableName schemaTableName) {
237254
fields.add(docField);
238255
}
239256
}
240-
return new RediSearchTable(new RediSearchTableHandle(RediSearchTableHandle.Type.SEARCH, schemaTableName),
241-
columns.build());
257+
RediSearchTableHandle tableHandle = new RediSearchTableHandle(RediSearchTableHandle.Type.SEARCH,
258+
schemaTableName);
259+
return new RediSearchTable(tableHandle, columns.build(), indexInfo);
242260
}
243261

244262
private Optional<IndexInfo> indexInfo(String index) {
@@ -278,8 +296,7 @@ private Type columnType(TypeSignature typeSignature) {
278296
return typeManager.fromSqlType(typeSignature.toString());
279297
}
280298

281-
public SearchResults<String, String> search(RediSearchTableHandle tableHandle,
282-
List<RediSearchColumnHandle> columns) {
299+
public SearchResults<String, String> search(RediSearchTableHandle tableHandle, String[] columns) {
283300
Search search = translator.search(tableHandle, columns);
284301
log.info("Running %s", search);
285302
return connection.sync().ftSearch(search.getIndex(), search.getQuery(), search.getOptions());
@@ -288,20 +305,32 @@ public SearchResults<String, String> search(RediSearchTableHandle tableHandle,
288305
public AggregateWithCursorResults<String> aggregate(RediSearchTableHandle table) {
289306
Aggregation aggregation = translator.aggregate(table);
290307
log.info("Running %s", aggregation);
291-
return connection.sync().ftAggregate(aggregation.getIndex(), aggregation.getQuery(),
292-
aggregation.getCursorOptions(), aggregation.getOptions());
308+
AggregateWithCursorResults<String> results = connection.sync().ftAggregate(aggregation.getIndex(),
309+
aggregation.getQuery(), aggregation.getCursorOptions(), aggregation.getOptions());
310+
List<AggregateOperation<?, ?>> groupBys = aggregation.getOptions().getOperations().stream()
311+
.filter(o -> o.getType() == AggregateOperation.Type.GROUP).collect(Collectors.toUnmodifiableList());
312+
if (results.isEmpty() && !groupBys.isEmpty()) {
313+
Group groupBy = (Group) groupBys.get(0);
314+
Optional<String> as = groupBy.getReducers()[0].getAs();
315+
if (as.isPresent()) {
316+
Map<String, Object> doc = new HashMap<>();
317+
doc.put(as.get(), 0);
318+
results.add(doc);
319+
}
320+
}
321+
return results;
293322
}
294323

295324
public AggregateWithCursorResults<String> cursorRead(RediSearchTableHandle tableHandle, long cursor) {
296-
String index = index(tableHandle);
325+
String index = index(tableHandle.getSchemaTableName());
297326
if (config.getCursorCount() > 0) {
298327
return connection.sync().ftCursorRead(index, cursor, config.getCursorCount());
299328
}
300329
return connection.sync().ftCursorRead(index, cursor);
301330
}
302331

303-
private String index(RediSearchTableHandle tableHandle) {
304-
return tableHandle.getSchemaTableName().getTableName();
332+
private String index(SchemaTableName schemaTableName) {
333+
return schemaTableName.getTableName();
305334
}
306335

307336
private Field<String> buildField(String columnName, Type columnType) {
@@ -381,7 +410,11 @@ private TypeSignature varcharType() {
381410
}
382411

383412
public void cursorDelete(RediSearchTableHandle tableHandle, long cursor) {
384-
connection.sync().ftCursorDelete(index(tableHandle), cursor);
413+
connection.sync().ftCursorDelete(index(tableHandle.getSchemaTableName()), cursor);
414+
}
415+
416+
public Long deleteDocs(List<String> docIds) {
417+
return connection.sync().del(docIds.toArray(String[]::new));
385418
}
386419

387420
}

0 commit comments

Comments
 (0)