Skip to content

Commit 2b148c8

Browse files
author
cuidapeng
committed
pub
1 parent f9a2d19 commit 2b148c8

File tree

11 files changed

+1248
-1
lines changed

11 files changed

+1248
-1
lines changed

.gitignore

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
# Created by .ignore support plugin (hsz.mobi)
2+
### Java template
3+
# Compiled class file
4+
*.class
5+
6+
# Log file
7+
*.log
8+
9+
# BlueJ files
10+
*.ctxt
11+
12+
# Mobile Tools for Java (J2ME)
13+
.mtj.tmp/
14+
15+
# Package Files #
16+
*.jar
17+
*.war
18+
*.nar
19+
*.ear
20+
*.zip
21+
*.tar.gz
22+
*.rar
23+
24+
# virtual machine crash logs, see http://www.java.com/en/download/help/error_hotspot.xml
25+
hs_err_pid*
26+
27+
.idea
28+
target
29+
*.iml

README.md

Lines changed: 196 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1,196 @@
1-
# flink-connector-elasticsearch-source
1+
# flink-connector-elasticsearch-source
2+
3+
## Why
4+
5+
[Elasticsearch | Apache Flink](https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/datastream/elasticsearch/) / [Elasticsearch | Apache Flink](https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/table/elasticsearch/) / [flink-connector-elasticsearch7](https://github.com/apache/flink/tree/release-1.13.1/flink-connectors/flink-connector-elasticsearch7) doesn't support Elasticsearch Source and Table
6+
7+
## How
8+
9+
* [Hadoop Compatibility | Apache Flink](https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/dataset/hadoop_compatibility/)
10+
11+
* [elasticsearch-hadoop](https://github.com/elastic/elasticsearch-hadoop) / [ Elasticsearch for Apache Hadoop](https://www.elastic.co/guide/en/elasticsearch/hadoop/7.x/mapreduce.html)
12+
13+
## Install
14+
15+
```
16+
mvn package -Dmaven.test.skip=true
17+
cp target/target/flink-connector-elasticsearch-hadoop-1.0.jar /opt/flink/lib/
18+
```
19+
20+
## Use
21+
22+
```sql
23+
CREATE TABLE flink_es_table(
24+
_metadata ROW<_index STRING,_type STRING,_id STRING>
25+
) WITH (
26+
'connector.type'='elasticsearch',
27+
'es.resource'='flink_es_table/_doc',
28+
'es.nodes'='127.0.0.1:9200',
29+
'es.port'='9200',
30+
'es.query'='?q=*',
31+
'es.nodes.client.only'='false',
32+
'es.nodes.discovery'='false',
33+
'es.nodes.wan.only'='true'
34+
);
35+
36+
SELECT _index,_type,_id FROM flink_es_table;
37+
```
38+
39+
## Detail
40+
41+
[Elasticsearch for Apache Hadoop](https://www.elastic.co/guide/en/elasticsearch/hadoop/7.x/mapreduce.html)
42+
43+
[Configuration](https://www.elastic.co/guide/en/elasticsearch/hadoop/7.x/configuration.html)
44+
45+
## Test
46+
47+
### init
48+
49+
`docker run -d --name es7 -p 9200:9200 -p 9300:9300 -e "discovery.type=single-node" docker.elastic.co/elasticsearch/elasticsearch:7.10.2-amd64`
50+
51+
`curl -XPOST --header "Content-Type: application/json" "http://127.0.0.1:9200/_bulk" --data-binary @data/flink_es_table`
52+
53+
```json
54+
{"took":640,"errors":false,"items":[{"index":{"_index":"flink_es_table","_type":"_doc","_id":"es_id_1","_version":1,"result":"created","_shards":{"total":2,"successful":1,"failed":0},"_seq_no":0,"_primary_term":1,"status":201}},{"create":{"_index":"flink_es_table","_type":"_doc","_id":"es_id_2","_version":1,"result":"created","_shards":{"total":2,"successful":1,"failed":0},"_seq_no":1,"_primary_term":1,"status":201}}]}
55+
```
56+
57+
### run
58+
59+
`org.apache.flink.connector.elasticsearch.table.ElasticsearchTableSourceTest`
60+
61+
output
62+
63+
```sql
64+
CREATE TABLE flink_es_table(_metadata ROW<_index STRING,_type STRING,_id STRING>,int_key INT,int_array ARRAY<INT>,int_object MAP<STRING,INT>,int_nested ARRAY<ROW<key_3 INT,key_4 INT>>,string_key STRING,string_array ARRAY<STRING>,string_object MAP<STRING,STRING>,string_nested ARRAY<ROW<key_3 STRING,key_4 STRING>>,double_key DOUBLE,double_array ARRAY<DOUBLE>,double_object MAP<STRING,DOUBLE>,double_nested ARRAY<ROW<key_3 DOUBLE,key_4 DOUBLE>>,time_key TIMESTAMP,time_array ARRAY<TIMESTAMP>,time_object MAP<STRING,TIMESTAMP>,time_nested ARRAY<ROW<key_3 TIMESTAMP,key_4 TIMESTAMP>>,bool_key BOOLEAN,bool_array ARRAY<BOOLEAN>,bool_object MAP<STRING,BOOLEAN>,bool_nested ARRAY<ROW<key_3 BOOLEAN,key_4 BOOLEAN>>) WITH ( 'connector.type'='elasticsearch', 'es.resource'='flink_es_table/_doc', 'es.nodes'='k8s.cuidp.top:9201', 'es.port'='9201', 'es.query'='?q=*', 'es.nodes.client.only'='false', 'es.nodes.discovery'='false', 'es.nodes.wan.only'='true')
65+
66+
SELECT _index,_type,_id,int_key,int_array,int_object,int_nested,int_key,int_array,int_object,int_nested,string_key,string_array,string_object,string_nested,time_key,time_array,time_object,time_nested,bool_key,bool_array,bool_object,bool_nested FROM flink_es_table
67+
68+
input results:
69+
+I[flink_es_table, _doc, es_id_1, 10, [11, 12], {key_2=14, key_1=13}, [+I[15, 16], +I[17, 18]], 10, [11, 12], {key_2=14, key_1=13}, [+I[15, 16], +I[17, 18]], str0, [str1, str2], {key_2=str4, key_1=str3}, [+I[str5, str6], +I[str7, str8]], 2021-01-10 00:00:00.0, [2021-01-11T00:00, 2021-01-12T00:00], {key_2=2021-01-14 00:00:00.0, key_1=2021-01-13 00:00:00.0}, [+I[2021-01-15 00:00:00.0, 2021-01-16 00:00:00.0], +I[2021-01-17 00:00:00.0, 2021-01-18 00:00:00.0]], true, [true, false], {key_2=false, key_1=true}, [+I[true, false], +I[false, true]]]
70+
+I[flink_es_table, _doc, es_id_2, 20, [21, 22], {key_2=24, key_1=23}, [+I[25, 26], +I[27, 28]], 20, [21, 22], {key_2=24, key_1=23}, [+I[25, 26], +I[27, 28]], str0, [str1, str2], {key_2=str4, key_1=str3}, [+I[str5, str6], +I[str7, str8]], 2021-01-20 00:00:00.0, [2021-01-21T00:00, 2021-01-22T00:00], {key_2=2021-01-24 00:00:00.0, key_1=2021-01-23 00:00:00.0}, [+I[2021-01-25 00:00:00.0, 2021-01-26 00:00:00.0], +I[2021-01-27 00:00:00.0, 2021-01-28 00:00:00.0]], true, [true, false], {key_2=false, key_1=true}, [+I[true, false], +I[false, true]]]
71+
72+
```
73+
74+
75+
## Read And Write(just for example)
76+
77+
[Remote clusters](https://www.elastic.co/guide/en/elasticsearch/reference/7.x/modules-remote-clusters.html) and [Reindex API](https://www.elastic.co/guide/en/elasticsearch/reference/7.x/docs-reindex.html#reindex-from-remote)
78+
79+
` ./bin/sql-client.sh`
80+
81+
### create elasticsearch source table
82+
83+
```sql
84+
CREATE TABLE flink_es_table(
85+
_metadata ROW<_index STRING,_type STRING,_id STRING>,
86+
int_key INT,int_array ARRAY<INT>,int_object MAP<STRING,INT>,int_nested ARRAY<ROW<key_3 INT,key_4 INT>>,
87+
string_key STRING,string_array ARRAY<STRING>,string_object MAP<STRING,STRING>,string_nested ARRAY<ROW<key_3 STRING,key_4 STRING>>,
88+
double_key DOUBLE,double_array ARRAY<DOUBLE>,double_object MAP<STRING,DOUBLE>,double_nested ARRAY<ROW<key_3 DOUBLE,key_4 DOUBLE>>,
89+
time_key TIMESTAMP,time_array ARRAY<TIMESTAMP>,time_object MAP<STRING,TIMESTAMP>,time_nested ARRAY<ROW<key_3 TIMESTAMP,key_4 TIMESTAMP>>,
90+
bool_key BOOLEAN,bool_array ARRAY<BOOLEAN>,bool_object MAP<STRING,BOOLEAN>,bool_nested ARRAY<ROW<key_3 BOOLEAN,key_4 BOOLEAN>>
91+
) WITH (
92+
'connector.type'='elasticsearch',
93+
'es.resource'='flink_es_table/_doc',
94+
'es.nodes'='127.0.0.1:9200',
95+
'es.port'='9200',
96+
'es.query'='?q=*',
97+
'es.nodes.client.only'='false',
98+
'es.nodes.discovery'='false',
99+
'es.nodes.wan.only'='true'
100+
);
101+
```
102+
103+
104+
105+
### create elasticsearch sink table
106+
107+
* orginal
108+
109+
es doc source with id
110+
111+
```sql
112+
CREATE TABLE flink_es_table_copy_sink(
113+
id STRING,
114+
int_key INT,int_array ARRAY<INT>,int_object MAP<STRING,INT>,int_nested ARRAY<ROW<key_3 INT,key_4 INT>>,
115+
PRIMARY KEY (`id`) NOT ENFORCED) WITH (
116+
'connector'='elasticsearch-7',
117+
'index'='flink_es_table_copy',
118+
'hosts'='127.0.0.1:9200'
119+
);
120+
```
121+
122+
123+
124+
* custom
125+
126+
es doc source without id
127+
128+
```sql
129+
CREATE TABLE flink_es_table_copy_sink(
130+
id STRING,
131+
int_key INT,int_array ARRAY<INT>,int_object MAP<STRING,INT>,int_nested ARRAY<ROW<key_3 INT,key_4 INT>>,
132+
PRIMARY KEY (`id`) NOT ENFORCED) WITH (
133+
'connector'='elasticsearch-7-ignore',
134+
'index'='flink_es_table_copy',
135+
'hosts'='127.0.0.1:9200',
136+
'ignore-fields'='id'
137+
);
138+
```
139+
140+
### source->sink
141+
142+
```sql
143+
insert into flink_es_table_copy_sink SELECT _id,int_key,int_array,int_object,int_nested FROM flink_es_table;
144+
```
145+
146+
147+
148+
#### search copy
149+
150+
```sql
151+
CREATE TABLE flink_es_table_copy_source(
152+
_metadata ROW<_index STRING,_type STRING,_id STRING>,
153+
int_key INT,int_array ARRAY<INT>,int_object MAP<STRING,INT>,int_nested ARRAY<ROW<key_3 INT,key_4 INT>>
154+
) WITH (
155+
'connector.type'='elasticsearch',
156+
'es.resource'='flink_es_table_copy/_doc',
157+
'es.nodes'='127.0.0.1:9200',
158+
'es.port'='9200',
159+
'es.query'='?q=*',
160+
'es.nodes.client.only'='false',
161+
'es.nodes.discovery'='false',
162+
'es.nodes.wan.only'='true'
163+
);
164+
165+
select * from flink_es_table_copy_source;
166+
```
167+
168+
#### result
169+
170+
```shell
171+
SQL Query Result (Table)
172+
Table program finished. Page: Last of 1 Updated: 11:25:32.615
173+
174+
_metadata int_key
175+
+I[flink_es_table_copy, _doc,~ 10
176+
+I[flink_es_table_copy, _doc,~ 20
177+
Q Quit + Inc Refresh G Goto Page N Next Page O Open Row
178+
R Refresh - Dec Refresh L Last Page P Prev Page
179+
180+
---
181+
_metadata (ROW<`_index` STRING, `_type` STRING, `_id` STRING>):
182+
+I[flink_es_table_copy, _doc, es_id_1]
183+
int_key (INT):
184+
10
185+
int_array (ARRAY<INT>):
186+
[11, 12]
187+
int_object (MAP<STRING, INT>):
188+
{key_2=14, key_1=13}
189+
int_nested (ARRAY<ROW<`key_3` INT, `key_4` INT>>):
190+
[+I[15, 16], +I[17, 18]]
191+
```
192+
193+
## More
194+
195+
OutputRowFunction`+`DeserializationSchema<RowData>` should load other Hadoop InputFormats as Table
196+

data/flink_es_table

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
{ "index" : { "_index" : "flink_es_table", "_id" : "es_id_1" } }
2+
{"bool_key":true,"bool_array":[true,false],"bool_object":{"key_1":true,"key_2":false},"bool_nested":[{"key_3":true,"key_4":false},{"key_3":false,"key_4":true}],"int_key":10,"int_array":[11,12],"int_object":{"key_1":13,"key_2":14},"int_nested":[{"key_3":15,"key_4":16},{"key_3":17,"key_4":18}],"double_key":10,"double_array":[11,12],"double_object":{"key_1":13,"key_2":14},"double_nested":[{"key_3":15,"key_4":16},{"key_3":17,"key_4":18}],"string_key":"str0","string_array":["str1","str2"],"string_object":{"key_1":"str3","key_2":"str4"},"string_nested":[{"key_3":"str5","key_4":"str6"},{"key_3":"str7","key_4":"str8"}],"time_key":"2021-01-10T00:00:00","time_array":["2021-01-11T00:00:00","2021-01-12T00:00:00"],"time_object":{"key_1":"2021-01-13T00:00:00","key_2":"2021-01-14T00:00:00"},"time_nested":[{"key_3":"2021-01-15T00:00:00","key_4":"2021-01-16T00:00:00"},{"key_3":"2021-01-17T00:00:00","key_4":"2021-01-18T00:00:00"}]}
3+
{ "create" : { "_index" : "flink_es_table", "_id" : "es_id_2" } }
4+
{"bool_key":true,"bool_array":[true,false],"bool_object":{"key_1":true,"key_2":false},"bool_nested":[{"key_3":true,"key_4":false},{"key_3":false,"key_4":true}],"int_key":20,"int_array":[21,22],"int_object":{"key_1":23,"key_2":24},"int_nested":[{"key_3":25,"key_4":26},{"key_3":27,"key_4":28}],"double_key":20,"double_array":[21,22],"double_object":{"key_1":23,"key_2":24},"double_nested":[{"key_3":25,"key_4":26},{"key_3":27,"key_4":28}],"string_key":"str0","string_array":["str1","str2"],"string_object":{"key_1":"str3","key_2":"str4"},"string_nested":[{"key_3":"str5","key_4":"str6"},{"key_3":"str7","key_4":"str8"}],"time_key":"2021-01-20T00:00:00","time_array":["2021-01-21T00:00:00","2021-01-22T00:00:00"],"time_object":{"key_1":"2021-01-23T00:00:00","key_2":"2021-01-24T00:00:00"},"time_nested":[{"key_3":"2021-01-25T00:00:00","key_4":"2021-01-26T00:00:00"},{"key_3":"2021-01-27T00:00:00","key_4":"2021-01-28T00:00:00"}]}

0 commit comments

Comments
 (0)