Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 13 additions & 15 deletions feapder_pipelines/pipelines/pgsql_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@
class PgsqlPipeline(BasePipeline):
def __init__(self):
self._to_db = None
self._indexes_cols_cached = {}
# self._indexes_cols_cached = {}
self._constraint_cached = {}

@property
def to_db(self):
Expand All @@ -28,19 +29,16 @@ def to_db(self):

return self._to_db

def __get_indexes_cols(self, table):
if table not in self._indexes_cols_cached:
get_indexes_sql = tools.get_primaryKey_col_sql(table)
indexes_cols = self.to_db.find(sql=get_indexes_sql) or "id"
log.info(f"主键列名:{indexes_cols[0][0]}")
if indexes_cols:
indexes_cols = indexes_cols[0][0]
else:
log.error(f"无法找到主键列名")
raise Exception("请确保数据库有主键")
self._indexes_cols_cached[table] = indexes_cols
def __get_constraint_name(self, table):
if table not in self._constraint_cached:
get_constraint_sql = tools.get_constraint_name_sql(table)
constraint_names = self.to_db.find(sql=get_constraint_sql)
if not constraint_names:
log.error(f"无法找到唯一约束名")
raise Exception("请确保数据库有唯一约束")
self._constraint_cached[table] = constraint_names[0][0]

return self._indexes_cols_cached[table]
return self._constraint_cached[table]

def save_items(self, table, items: List[Dict]) -> bool:
"""
Expand All @@ -54,7 +52,7 @@ def save_items(self, table, items: List[Dict]) -> bool:

"""
sql, datas = tools.make_batch_sql(
table, items, indexes_cols=self.__get_indexes_cols(table)
table, items, constraint_name=self.__get_constraint_name(table)
)
add_count = self.to_db.add_batch(sql, datas)
# log.info(sql)
Expand Down Expand Up @@ -82,7 +80,7 @@ def update_items(self, table, items: List[Dict], update_keys=Tuple) -> bool:
table,
items,
update_columns=update_keys or list(items[0].keys()),
indexes_cols=self.__get_indexes_cols(table),
constraint_name=self.__get_constraint_name(table),
)
# log.info(sql)
update_count = self.to_db.add_batch(sql, datas)
Expand Down
50 changes: 27 additions & 23 deletions feapder_pipelines/utils/pgsql_tool.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,18 +89,18 @@ def get_constraint_name_sql(table):
---------
@result:
"""
sql = "SELECT indexname FROM pg_indexes WHERE tablename = '{table}'"
sql = "SELECT indexname FROM pg_indexes WHERE tablename = '{table}' order by indexname"
sql = sql.format(table=table).replace("None", "null")
return sql


def make_insert_sql(
table,
data,
auto_update=False,
update_columns=(),
insert_ignore=False,
indexes_cols=(),
table,
data,
auto_update=False,
update_columns=(),
insert_ignore=False,
constraint_name=(),
):
"""
@summary: 适用于PostgreSQL
Expand All @@ -110,7 +110,7 @@ def make_insert_sql(
@param auto_update: 更新所有所有列的开关
@param update_columns: 需要更新的列 默认全部,当指定值时,auto_update设置无效,当duplicate key冲突时更新指定的列
@param insert_ignore: 更新策略:数据存在则忽略本条数据
@param indexes_cols: 索引列
@param constraint_name: 约束名称,用于唯一性判断,通过alter table <table_name> add constraint <constraint_name> unique(<field1,filed2...>) 创建
---------
@result:
"""
Expand All @@ -128,25 +128,25 @@ def make_insert_sql(
["{key}=excluded.{key}".format(key=key) for key in update_columns]
)
sql = (
"insert into {table} {keys} values {values} on conflict({indexes_cols}) DO UPDATE SET %s"
% update_columns_
"insert into {table} {keys} values {values} on conflict ON CONSTRAINT {constraint_name} DO UPDATE SET %s"
% update_columns_
)

elif auto_update:
update_all_columns_ = ", ".join(
["{key}=excluded.{key}".format(key=key) for key in keys]
)
sql = (
"insert into {table} {keys} values {values} on conflict({indexes_cols}) DO UPDATE SET %s"
% update_all_columns_
"insert into {table} {keys} values {values} on conflict ON CONSTRAINT {constraint_name} DO UPDATE SET %s"
% update_all_columns_
)
elif insert_ignore:
sql = "insert into {table} {keys} values {values} on conflict({indexes_cols}) DO NOTHING"
sql = "insert into {table} {keys} values {values} on conflict ON CONSTRAINT {constraint_name} DO NOTHING"
else:
sql = "insert into {table} {keys} values {values}"

sql = sql.format(
table=table, keys=keys, values=values, indexes_cols=indexes_cols
table=table, keys=keys, values=values, constraint_name=constraint_name
).replace("None", "null")
return sql

Expand Down Expand Up @@ -185,7 +185,7 @@ def make_batch_sql(
auto_update=False,
update_columns=(),
update_columns_value=(),
indexes_cols=(),
constraint_name=(),
):
"""
@summary: 生产批量的sql
Expand All @@ -195,7 +195,7 @@ def make_batch_sql(
@param auto_update: 使用的是replace into, 为完全覆盖已存在的数据
@param update_columns: 需要更新的列 默认全部,当指定值时,auto_update设置无效,当duplicate key冲突时更新指定的列
@param update_columns_value: 需要更新的列的值 默认为datas里边对应的值, 注意 如果值为字符串类型 需要主动加单引号, 如 update_columns_value=("'test'",)
@param indexes_cols: 索引列 str
@param constraint_name: 约束名称,用于唯一性判断,通过alter table <table_name> add constraint <constraint_name> unique(<field1,filed2...>) 创建
---------
@result:
"""
Expand Down Expand Up @@ -235,30 +235,34 @@ def make_batch_sql(
update_columns_ = ", ".join(
["{key}=excluded.{key}".format(key=key) for key in update_columns]
)
sql = "insert into {table} {keys} values {values_placeholder} ON CONFLICT({indexes_cols}) DO UPDATE SET {update_columns}".format(
# sql = "insert into {table} {keys} values {values_placeholder} ON CONFLICT({indexes_cols}) DO UPDATE SET {update_columns}".format(
sql = "insert into {table} {keys} values {values_placeholder} on conflict ON CONSTRAINT {constraint_name} DO UPDATE SET {update_columns}".format(
table=table,
keys=keys,
values_placeholder=values_placeholder,
update_columns=update_columns_,
indexes_cols=indexes_cols,
constraint_name=constraint_name,
)
elif auto_update:
update_all_columns_ = ", ".join(
["{key}=excluded.{key}".format(key=key) for key in keys]
)
sql = "insert into {table} {keys} values {values_placeholder} on conflict({indexes_cols}) DO UPDATE SET {update_all_columns_}".format(
# sql = "insert into {table} {keys} values {values_placeholder} on conflict({indexes_cols}) DO UPDATE SET {update_all_columns_}".format(
sql = "insert into {table} {keys} values {values_placeholder} on conflict ON CONSTRAINT {constraint_name} DO UPDATE SET {update_all_columns_}".format(
table=table,
keys=keys,
values_placeholder=values_placeholder,
indexes_cols=indexes_cols,
constraint_name=constraint_name,
update_all_columns_=update_all_columns_,
)
else:
sql = "insert into {table} {keys} values {values_placeholder} on conflict({indexes_cols}) do nothing".format(
# sql = "insert into {table} {keys} values {values_placeholder} on conflict({indexes_cols}) do nothing".format(
sql = "insert into {table} {keys} values {values_placeholder} on conflict ON CONSTRAINT {constraint_name} do nothing".format(
table=table,
keys=keys,
values_placeholder=values_placeholder,
indexes_cols=indexes_cols,
constraint_name=constraint_name,
)

return sql, values