diff --git a/demo/bulkload_cfg_report.ctl b/demo/bulkload_cfg_report.ctl new file mode 100644 index 0000000..547a83c --- /dev/null +++ b/demo/bulkload_cfg_report.ctl @@ -0,0 +1,47 @@ + +# [.]table_name +OUTPUT = cfg_report +# database action +TRUNCATE=false +ON_DUPLICATE_KEEP = NEW +#FILTER=bulkload_filter_cellmr_to_5s + +# Input data location (absolute path) +INPUT = /home/postgres/data/cfg_report.csv + +#INPUT = /home/postgres/data/stat_5m_uemr.csv + +LOGFILE=/home/postgres/bulkload_test/logs/cfg_report.log +PARSE_BADFILE=/home/postgres/bulkload_test/logs/cfg_report.bad.log +DUPLICATE_BADFILE=/home/postgres/bulkload_test/logs/cfg_report.duplicate.log + +# Input file type +TYPE = CSV +# CSV Fomart Parameters +QUOTE = "\"" +ESCAPE = \ +DELIMITER = "|" +NULL="" + +SKIP=0 +LIMIT=INFINITE +PARSE_ERRORS=INFINITE +DUPLICATE_ERRORS=INFINITE + +WRITER=PARALLEL +VERBOSE =NO + +#CSV_FIELDS=table_name,params_name,description,params_value,default_value,range_operator +#CSV_FIELDS=table_name,params_name,description,params_value,default_value,range_operator,oo,ii,pp +CSV_FIELDS=table_name,params_name,description +#CSV_FIELDS=table_name + + +#FINAL_FIELDS=table_name,params_name,description,table_name,params_name,pp +#FINAL_FIELDS=table_name,params_name,description,params_value,default_value,range_operator +#FINAL_FIELDS=params_name,,params_value,default_value,range_operator +#FINAL_FIELDS=table_name,params_name,PP,VV,NN,KK +FINAL_FIELDS=table_name,params_name,PP,MM +#FINAL_FIELDS=table_name,"",params_name,,description,params_value,default_value,range_operator +#FINAL_FIELDS= + diff --git a/lib/parser_csv.c b/lib/parser_csv.c index 3109944..2287b3a 100644 --- a/lib/parser_csv.c +++ b/lib/parser_csv.c @@ -30,6 +30,23 @@ #define INITIAL_BUF_LEN (1024 * 1024) #define MAX_BUF_LEN (16 * INITIAL_BUF_LEN) +typedef struct CsvFieldSelector +{ + /** + * @brief Give the field in the csv file a name.The names are separated by separators + * The number of names can be different from the number of fields in the csv file. + */ + char *csv_fields_name; + + /** + * @brief The name selected in the property csv_fields_name. + */ + char *final_fields_name; + int *field_index_array; //array of index that identity location field in the csv_fields_name + List *csv_field_list; + List *final_field_list; +} CsvFieldSelector; + typedef struct CSVParser { Parser base; @@ -105,6 +122,9 @@ typedef struct CSVParser char *null; /**< NULL value string */ List *fnn_name; /**< list of NOT NULL column names */ bool *fnn; /**< array of NOT NULL column flag */ + char **final_fileds; //store the final data imported into the table + CsvFieldSelector csv_field_selector; + } CSVParser; static void CSVParserInit(CSVParser *self, Checker *checker, const char *infile, TupleDesc desc, bool multi_process, Oid collation); @@ -116,6 +136,9 @@ static void CSVParserDumpRecord(CSVParser *self, FILE *fp, char *badfile); static void ExtractValuesFromCSV(CSVParser *self, int parsed_field); +static void InitFieldIndexArray(CSVParser *self); +static void SplitString(CSVParser *self, char *delimiter); +static int getFieldsCount(CSVParser *self); /* * @brief Copies specified area in the record buffer to the field buffer. * @@ -183,6 +206,7 @@ static void CSVParserInit(CSVParser *self, Checker *checker, const char *infile, TupleDesc desc, bool multi_process, Oid collation) { TupleCheckStatus status; + int fields_count; /* * set default values @@ -255,10 +279,14 @@ CSVParserInit(CSVParser *self, Checker *checker, const char *infile, TupleDesc d self->used_len = 0; self->field_buf = palloc(self->buf_len); self->next = self->rec_buf; - self->fields = palloc(Max(self->former.maxfields, 1) * sizeof(char *)); + InitFieldIndexArray(self); + fields_count = getFieldsCount(self); + self->fields = palloc(Max(fields_count, 1) * sizeof(char *)); self->fields[0] = NULL; self->null_len = strlen(self->null); self->eof = false; + self->final_fileds = palloc(Max(self->former.maxfields, 1) * sizeof(char *)); + self->final_fileds[0] = NULL; } /** @@ -355,7 +383,9 @@ CSVParserRead(CSVParser *self, Checker *checker) int src; /* Index to the next source */ int field_num = 0; /* Number of self->fields already parsed */ int parsed_field; - + int fields_count; + bool enable_continue = false; + int total_csvfile_fields_count = 0; /* * If EOF found in the previous calls, returns zero. */ @@ -420,6 +450,8 @@ CSVParserRead(CSVParser *self, Checker *checker) self->base.parsing_field = 1; self->field_buf[dst] = '\0'; self->fields[field_num] = self->field_buf + dst; + + fields_count = getFieldsCount(self); /* * Loop for each input character to parse record buffer. @@ -573,8 +605,11 @@ CSVParserRead(CSVParser *self, Checker *checker) } else if (inCR) { - appendToField(self, &dst, &src, i - src - 1); - checkFieldIsNull(self, field_num, i - field_head - 1); + if(self->csv_field_selector.field_index_array == NULL || fields_count >= total_csvfile_fields_count+1) + { + appendToField(self, &dst, &src, i - src - 1); + checkFieldIsNull(self, field_num, i - field_head - 1); + } self->rec_buf[i - 1] = '\0'; if (c != '\n') @@ -611,9 +646,11 @@ CSVParserRead(CSVParser *self, Checker *checker) * Even if no line feed is found at the end of the input file, there will * be no problem because we have already added line feed at EOF test above. */ - appendToField(self, &dst, &src, i - src); - - checkFieldIsNull(self, field_num, i - field_head); + if(self->csv_field_selector.field_index_array == NULL || fields_count >= total_csvfile_fields_count+1) + { + appendToField(self, &dst, &src, i - src); + checkFieldIsNull(self, field_num, i - field_head); + } /* * Line feed other than a quote mark is the record delimiter. Record parse @@ -625,18 +662,37 @@ CSVParserRead(CSVParser *self, Checker *checker) } else if (c == delim) { - appendToField(self, &dst, &src, i - src); - - checkFieldIsNull(self, field_num, i - field_head); - + total_csvfile_fields_count++; /* * If then number of columns specified in the input record exceeds the * number of columns of the copy target table, then the value of the last * column of the table will be overwritten by extra columns in the input * data successively. */ - if (field_num + 1 < self->former.maxfields) + + if(self->csv_field_selector.field_index_array != NULL ) + { + if(enable_continue) + continue; //after parse finished, It will execute other code, so break can not used; + + if (field_num + 1 >= fields_count) + { + enable_continue = true; + } + } + + appendToField(self, &dst, &src, i - src); + + //if(self->csv_field_selector.field_index_array == NULL ) + checkFieldIsNull(self, field_num, i - field_head); + + if(enable_continue) + continue; + + if (field_num + 1 < fields_count) field_num++; + + self->base.parsing_field++; /* @@ -663,6 +719,7 @@ CSVParserRead(CSVParser *self, Checker *checker) ereport(ERROR, (errcode(ERRCODE_INTERNAL_ERROR), errmsg("unterminated CSV quoted field"))); + /* * We accept a record only for new lines as input of the functions without * the arguments. @@ -673,7 +730,7 @@ CSVParserRead(CSVParser *self, Checker *checker) /* * It's an error if the number of self->fields exceeds the number of valid column. */ - if (self->base.parsing_field > self->former.maxfields) + if (self->csv_field_selector.field_index_array == NULL && self->base.parsing_field > self->former.maxfields) { ereport(ERROR, (errcode(ERRCODE_INTERNAL_ERROR), errmsg("extra data after last expected column"))); @@ -681,7 +738,7 @@ CSVParserRead(CSVParser *self, Checker *checker) /* * Error, if the number of self->fields is less than the number of valid columns. */ - if (self->base.parsing_field < self->former.minfields) + if (self->csv_field_selector.field_index_array == NULL && self->base.parsing_field < self->former.minfields) { if (self->filter.funcstr) ereport(ERROR, (errcode(ERRCODE_INTERNAL_ERROR), @@ -706,8 +763,50 @@ CSVParserRead(CSVParser *self, Checker *checker) self->base.parsing_field = i + 1; self->fields[i] = CheckerConversion(checker, self->fields[i]); } - - ExtractValuesFromCSV(self, parsed_field); + + /* + if field_index_array is not null, then we find the value that fields by field_index_array. + Then set the values into final_fileds. + Only Both of csv_fields_name and final_fields_name are not NULL. csv_field_selector is Effective. + */ + if(self->csv_field_selector.csv_fields_name != NULL && self->csv_field_selector.final_fields_name != NULL) + { + char *field_name; + int final_field_count; + final_field_count = list_length(self->csv_field_selector.final_field_list); + for(i = 0;iformer.maxfields; i++) + { + int j = self->csv_field_selector.field_index_array[i]; + if(j>=0 && jfinal_fileds[i] = self->fields[j]; + } + else + { + if(i < final_field_count) + { + field_name =(char *) list_nth(self->csv_field_selector.final_field_list,i); + if(strcmp(field_name,"NULL")==0 || strcmp(field_name,"")==0) + self->final_fileds[i] = NULL; + else + self->final_fileds[i] = CheckerConversion(checker, field_name); + } + else + self->final_fileds[i] = NULL; + } + } + } + else + { + self->final_fileds = self->fields; + } + + //ExtractValuesFromCSV(self, parsed_field); + /* + The function ExtractValuesFromCSV has changed. + Last version of bulkload handle self->fields, but this version handle self->final_fileds; + */ + ExtractValuesFromCSV(self, self->former.maxfields); self->base.parsing_field = -1; if (self->filter.funcstr) @@ -757,9 +856,19 @@ CSVParserParam(CSVParser *self, const char *keyword, char *value) ASSERT_ONCE(!self->filter.funcstr); self->filter.funcstr = pstrdup(value); } + else if(CompareKeyword(keyword,"CSV_FIELDS")) + { + ASSERT_ONCE(!self->csv_field_selector.csv_fields_name); + self->csv_field_selector.csv_fields_name = pstrdup(value); + } + else if(CompareKeyword(keyword,"FINAL_FIELDS")) + { + ASSERT_ONCE(!self->csv_field_selector.final_fields_name); + self->csv_field_selector.final_fields_name = pstrdup(value); + } else return false; /* unknown parameter */ - + return true; } @@ -791,7 +900,7 @@ CSVParserDumpParams(CSVParser *self) str = QuoteString(self->null); appendStringInfo(&buf, "NULL = %s\n", str); pfree(str); - + if (self->filter.funcstr) appendStringInfo(&buf, "FILTER = %s\n", self->filter.funcstr); @@ -802,6 +911,34 @@ CSVParserDumpParams(CSVParser *self) pfree(str); } + if(self->csv_field_selector.csv_fields_name != NULL) + { + str = QuoteString(self->csv_field_selector.csv_fields_name); + appendStringInfo(&buf,"CSV_FIELDS = %s\n",str); + pfree(str); + + foreach(name,self->csv_field_selector.csv_field_list) + { + str = QuoteString(lfirst(name)); + appendStringInfo(&buf, "CSV_FIELD_LIST = %s\n", str); + pfree(str); + } + } + + if(self->csv_field_selector.final_fields_name != NULL) + { + str = QuoteString(self->csv_field_selector.final_fields_name); + appendStringInfo(&buf,"FINAL_FIELDS = %s\n",str); + pfree(str); + + foreach(name, self->csv_field_selector.final_field_list) + { + str = QuoteString(lfirst(name)); + appendStringInfo(&buf, "FINAL_FIELD_LIST = %s\n", str); + pfree(str); + } + } + LoggerLog(INFO, buf.data, 0); pfree(buf.data); } @@ -861,9 +998,9 @@ ExtractValuesFromCSV(CSVParser *self, int parsed_field) self->base.parsing_field = i + 1; /* 1 origin */ index = self->former.attnum[i]; /* Physical column index */ - if (self->fields[i] || self->fnn[index]) + if (self->final_fileds[i] || self->fnn[index]) { - value = TupleFormerValue(&self->former, self->fields[i], index); + value = TupleFormerValue(&self->former, self->final_fileds[i], index); isnull = false; } else @@ -886,3 +1023,91 @@ ExtractValuesFromCSV(CSVParser *self, int parsed_field) self->former.values[i] = self->filter.defaultValues[index]; } } + +/** + * @brief + * This function initialize the field_index_array. + * field_index_array store indexs that where field in final_fields_name appear in csv_fields_name. + * + * @param CSVParser *self + * @return void + */ +static void +InitFieldIndexArray(CSVParser *self) +{ + int final_fields_index; + int i; + int j; + ListCell *csv_name; + ListCell *final_name; + + if(self->csv_field_selector.final_fields_name == NULL || self->csv_field_selector.csv_fields_name == NULL) + return ; + + SplitString(self,","); + + self->csv_field_selector.field_index_array = palloc(self->former.maxfields * sizeof(int)); + + for(i = 0; iformer.maxfields; i++) + { + self->csv_field_selector.field_index_array[i] = -1; + } + + final_fields_index = -1; + foreach(final_name, self->csv_field_selector.final_field_list) + { + ++final_fields_index; + + if(strcmp(lfirst(final_name),"NULL")==0 || lfirst(final_name) == NULL) + continue; + + if(final_fields_index+1 > self->former.maxfields) + break; + + j = -1; + foreach(csv_name,self->csv_field_selector.csv_field_list) + { + ++j; + if(strcmp(lfirst(final_name),lfirst(csv_name))==0) + { + self->csv_field_selector.field_index_array[final_fields_index] = j; + break; + } + } + } + +} + +/** + * @brief + * This function split a string into an array by delimeter + */ +static void +SplitString(CSVParser *self, char *delimiter) +{ + char *field_name; + char *str; + + str = self->csv_field_selector.csv_fields_name; + for (field_name = strsep(&str, delimiter); field_name != NULL; field_name = strsep(&str, delimiter)) { + self->csv_field_selector.csv_field_list = lappend(self->csv_field_selector.csv_field_list, field_name); + } + + str = self->csv_field_selector.final_fields_name; + for (field_name = strsep(&str, delimiter); field_name != NULL; field_name = strsep(&str, delimiter)) { + self->csv_field_selector.final_field_list = lappend(self->csv_field_selector.final_field_list, field_name); + } +} + +static int +getFieldsCount(CSVParser *self) +{ + if (self->csv_field_selector.field_index_array == NULL) + { + return self->former.maxfields; + } + else + { + return list_length(self->csv_field_selector.csv_field_list); + } +}