|
34 | 34 |
|
35 | 35 | import org.opensearch.LegacyESVersion;
|
36 | 36 | import org.opensearch.Version;
|
| 37 | +import org.hamcrest.MatcherAssert; |
37 | 38 | import org.opensearch.client.Request;
|
| 39 | +import org.opensearch.client.Response; |
38 | 40 | import org.opensearch.client.ResponseException;
|
39 | 41 | import org.opensearch.test.XContentTestUtils.JsonMapView;
|
40 | 42 |
|
| 43 | +import java.io.IOException; |
41 | 44 | import java.util.Map;
|
42 | 45 |
|
43 | 46 | import static org.opensearch.cluster.metadata.IndexNameExpressionResolver.SYSTEM_INDEX_ENFORCEMENT_VERSION;
|
| 47 | +import static org.hamcrest.Matchers.equalTo; |
44 | 48 | import static org.hamcrest.Matchers.hasKey;
|
45 | 49 | import static org.hamcrest.Matchers.is;
|
46 | 50 | import static org.hamcrest.Matchers.notNullValue;
|
@@ -68,25 +72,7 @@ public void testSystemIndicesUpgrades() throws Exception {
|
68 | 72 | }
|
69 | 73 | client().performRequest(bulk);
|
70 | 74 |
|
71 |
| - // start a async reindex job |
72 |
| - Request reindex = new Request("POST", "/_reindex"); |
73 |
| - reindex.setJsonEntity( |
74 |
| - "{\n" + |
75 |
| - " \"source\":{\n" + |
76 |
| - " \"index\":\"test_index_old\"\n" + |
77 |
| - " },\n" + |
78 |
| - " \"dest\":{\n" + |
79 |
| - " \"index\":\"test_index_reindex\"\n" + |
80 |
| - " }\n" + |
81 |
| - "}"); |
82 |
| - reindex.addParameter("wait_for_completion", "false"); |
83 |
| - Map<String, Object> response = entityAsMap(client().performRequest(reindex)); |
84 |
| - String taskId = (String) response.get("task"); |
85 |
| - |
86 |
| - // wait for task |
87 |
| - Request getTask = new Request("GET", "/_tasks/" + taskId); |
88 |
| - getTask.addParameter("wait_for_completion", "true"); |
89 |
| - client().performRequest(getTask); |
| 75 | + createAndVerifyStoredTask(); |
90 | 76 |
|
91 | 77 | // make sure .tasks index exists
|
92 | 78 | Request getTasksIndex = new Request("GET", "/.tasks");
|
@@ -121,6 +107,8 @@ public void testSystemIndicesUpgrades() throws Exception {
|
121 | 107 | assertThat(client().performRequest(putAliasRequest).getStatusLine().getStatusCode(), is(200));
|
122 | 108 | }
|
123 | 109 | } else if (CLUSTER_TYPE == ClusterType.UPGRADED) {
|
| 110 | + createAndVerifyStoredTask(); |
| 111 | + |
124 | 112 | assertBusy(() -> {
|
125 | 113 | Request clusterStateRequest = new Request("GET", "/_cluster/state/metadata");
|
126 | 114 | Map<String, Object> indices = new JsonMapView(entityAsMap(client().performRequest(clusterStateRequest)))
|
@@ -152,4 +140,29 @@ public void testSystemIndicesUpgrades() throws Exception {
|
152 | 140 | });
|
153 | 141 | }
|
154 | 142 | }
|
| 143 | + |
| 144 | + /** |
| 145 | + * Completed tasks get persisted into the .tasks index, so this method waits |
| 146 | + * until the task is completed in order to verify that it has been successfully |
| 147 | + * written to the index and can be retrieved. |
| 148 | + */ |
| 149 | + private static void createAndVerifyStoredTask() throws Exception { |
| 150 | + // Use update by query to create an async task |
| 151 | + final Request updateByQueryRequest = new Request("POST", "/test_index_old/_update_by_query"); |
| 152 | + updateByQueryRequest.addParameter("wait_for_completion", "false"); |
| 153 | + final Response updateByQueryResponse = client().performRequest(updateByQueryRequest); |
| 154 | + MatcherAssert.assertThat(updateByQueryResponse.getStatusLine().getStatusCode(), equalTo(200)); |
| 155 | + final String taskId = (String) entityAsMap(updateByQueryResponse).get("task"); |
| 156 | + |
| 157 | + // wait for task to complete |
| 158 | + waitUntil(() -> { |
| 159 | + try { |
| 160 | + final Response getTaskResponse = client().performRequest(new Request("GET", "/_tasks/" + taskId)); |
| 161 | + MatcherAssert.assertThat(getTaskResponse.getStatusLine().getStatusCode(), equalTo(200)); |
| 162 | + return (Boolean) entityAsMap(getTaskResponse).get("completed"); |
| 163 | + } catch (IOException e) { |
| 164 | + throw new AssertionError(e); |
| 165 | + } |
| 166 | + }); |
| 167 | + } |
155 | 168 | }
|
0 commit comments