diff --git a/feapder_pipelines/pipelines/pgsql_pipeline.py b/feapder_pipelines/pipelines/pgsql_pipeline.py index 73a77b3..8b19515 100644 --- a/feapder_pipelines/pipelines/pgsql_pipeline.py +++ b/feapder_pipelines/pipelines/pgsql_pipeline.py @@ -19,7 +19,8 @@ class PgsqlPipeline(BasePipeline): def __init__(self): self._to_db = None - self._indexes_cols_cached = {} + # self._indexes_cols_cached = {} + self._constraint_cached = {} @property def to_db(self): @@ -28,19 +29,16 @@ def to_db(self): return self._to_db - def __get_indexes_cols(self, table): - if table not in self._indexes_cols_cached: - get_indexes_sql = tools.get_primaryKey_col_sql(table) - indexes_cols = self.to_db.find(sql=get_indexes_sql) or "id" - log.info(f"主键列名:{indexes_cols[0][0]}") - if indexes_cols: - indexes_cols = indexes_cols[0][0] - else: - log.error(f"无法找到主键列名") - raise Exception("请确保数据库有主键") - self._indexes_cols_cached[table] = indexes_cols + def __get_constraint_name(self, table): + if table not in self._constraint_cached: + get_constraint_sql = tools.get_constraint_name_sql(table) + constraint_names = self.to_db.find(sql=get_constraint_sql) + if not constraint_names: + log.error(f"无法找到唯一约束名") + raise Exception("请确保数据库有唯一约束") + self._constraint_cached[table] = constraint_names[0][0] - return self._indexes_cols_cached[table] + return self._constraint_cached[table] def save_items(self, table, items: List[Dict]) -> bool: """ @@ -54,7 +52,7 @@ def save_items(self, table, items: List[Dict]) -> bool: """ sql, datas = tools.make_batch_sql( - table, items, indexes_cols=self.__get_indexes_cols(table) + table, items, constraint_name=self.__get_constraint_name(table) ) add_count = self.to_db.add_batch(sql, datas) # log.info(sql) @@ -82,7 +80,7 @@ def update_items(self, table, items: List[Dict], update_keys=Tuple) -> bool: table, items, update_columns=update_keys or list(items[0].keys()), - indexes_cols=self.__get_indexes_cols(table), + constraint_name=self.__get_constraint_name(table), ) # log.info(sql) update_count = self.to_db.add_batch(sql, datas) diff --git a/feapder_pipelines/utils/pgsql_tool.py b/feapder_pipelines/utils/pgsql_tool.py index 6971502..43fdad0 100644 --- a/feapder_pipelines/utils/pgsql_tool.py +++ b/feapder_pipelines/utils/pgsql_tool.py @@ -89,18 +89,18 @@ def get_constraint_name_sql(table): --------- @result: """ - sql = "SELECT indexname FROM pg_indexes WHERE tablename = '{table}'" + sql = "SELECT indexname FROM pg_indexes WHERE tablename = '{table}' order by indexname" sql = sql.format(table=table).replace("None", "null") return sql def make_insert_sql( - table, - data, - auto_update=False, - update_columns=(), - insert_ignore=False, - indexes_cols=(), + table, + data, + auto_update=False, + update_columns=(), + insert_ignore=False, + constraint_name=(), ): """ @summary: 适用于PostgreSQL @@ -110,7 +110,7 @@ def make_insert_sql( @param auto_update: 更新所有所有列的开关 @param update_columns: 需要更新的列 默认全部,当指定值时,auto_update设置无效,当duplicate key冲突时更新指定的列 @param insert_ignore: 更新策略:数据存在则忽略本条数据 - @param indexes_cols: 索引列 + @param constraint_name: 约束名称,用于唯一性判断,通过alter table add constraint unique() 创建 --------- @result: """ @@ -128,8 +128,8 @@ def make_insert_sql( ["{key}=excluded.{key}".format(key=key) for key in update_columns] ) sql = ( - "insert into {table} {keys} values {values} on conflict({indexes_cols}) DO UPDATE SET %s" - % update_columns_ + "insert into {table} {keys} values {values} on conflict ON CONSTRAINT {constraint_name} DO UPDATE SET %s" + % update_columns_ ) elif auto_update: @@ -137,16 +137,16 @@ def make_insert_sql( ["{key}=excluded.{key}".format(key=key) for key in keys] ) sql = ( - "insert into {table} {keys} values {values} on conflict({indexes_cols}) DO UPDATE SET %s" - % update_all_columns_ + "insert into {table} {keys} values {values} on conflict ON CONSTRAINT {constraint_name} DO UPDATE SET %s" + % update_all_columns_ ) elif insert_ignore: - sql = "insert into {table} {keys} values {values} on conflict({indexes_cols}) DO NOTHING" + sql = "insert into {table} {keys} values {values} on conflict ON CONSTRAINT {constraint_name} DO NOTHING" else: sql = "insert into {table} {keys} values {values}" - + sql = sql.format( - table=table, keys=keys, values=values, indexes_cols=indexes_cols + table=table, keys=keys, values=values, constraint_name=constraint_name ).replace("None", "null") return sql @@ -185,7 +185,7 @@ def make_batch_sql( auto_update=False, update_columns=(), update_columns_value=(), - indexes_cols=(), + constraint_name=(), ): """ @summary: 生产批量的sql @@ -195,7 +195,7 @@ def make_batch_sql( @param auto_update: 使用的是replace into, 为完全覆盖已存在的数据 @param update_columns: 需要更新的列 默认全部,当指定值时,auto_update设置无效,当duplicate key冲突时更新指定的列 @param update_columns_value: 需要更新的列的值 默认为datas里边对应的值, 注意 如果值为字符串类型 需要主动加单引号, 如 update_columns_value=("'test'",) - @param indexes_cols: 索引列 str + @param constraint_name: 约束名称,用于唯一性判断,通过alter table add constraint unique() 创建 --------- @result: """ @@ -235,30 +235,34 @@ def make_batch_sql( update_columns_ = ", ".join( ["{key}=excluded.{key}".format(key=key) for key in update_columns] ) - sql = "insert into {table} {keys} values {values_placeholder} ON CONFLICT({indexes_cols}) DO UPDATE SET {update_columns}".format( + # sql = "insert into {table} {keys} values {values_placeholder} ON CONFLICT({indexes_cols}) DO UPDATE SET {update_columns}".format( + sql = "insert into {table} {keys} values {values_placeholder} on conflict ON CONSTRAINT {constraint_name} DO UPDATE SET {update_columns}".format( table=table, keys=keys, values_placeholder=values_placeholder, update_columns=update_columns_, - indexes_cols=indexes_cols, + constraint_name=constraint_name, ) elif auto_update: update_all_columns_ = ", ".join( ["{key}=excluded.{key}".format(key=key) for key in keys] ) - sql = "insert into {table} {keys} values {values_placeholder} on conflict({indexes_cols}) DO UPDATE SET {update_all_columns_}".format( + # sql = "insert into {table} {keys} values {values_placeholder} on conflict({indexes_cols}) DO UPDATE SET {update_all_columns_}".format( + sql = "insert into {table} {keys} values {values_placeholder} on conflict ON CONSTRAINT {constraint_name} DO UPDATE SET {update_all_columns_}".format( table=table, keys=keys, values_placeholder=values_placeholder, - indexes_cols=indexes_cols, + constraint_name=constraint_name, update_all_columns_=update_all_columns_, ) else: - sql = "insert into {table} {keys} values {values_placeholder} on conflict({indexes_cols}) do nothing".format( + # sql = "insert into {table} {keys} values {values_placeholder} on conflict({indexes_cols}) do nothing".format( + sql = "insert into {table} {keys} values {values_placeholder} on conflict ON CONSTRAINT {constraint_name} do nothing".format( table=table, keys=keys, values_placeholder=values_placeholder, - indexes_cols=indexes_cols, + constraint_name=constraint_name, ) return sql, values +