Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
130 changes: 79 additions & 51 deletions ddpui/api/transform_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,15 @@
EditDbtOperationPayload,
LockCanvasRequestSchema,
LockCanvasResponseSchema,
GenerateGraphSchema,
)
from ddpui.utils.taskprogress import TaskProgress
from ddpui.core.transformfunctions import validate_operation_config, check_canvas_locked
from ddpui.core.transformfunctions import (
validate_operation_config,
check_canvas_locked,
chat_to_graph,
create_operation_node,
)
from ddpui.api.warehouse_api import get_warehouse_data
from ddpui.models.tasks import TaskProgressHashPrefix

Expand Down Expand Up @@ -153,70 +159,71 @@ def post_construct_dbt_model_operation(request, payload: CreateDbtModelPayload):
if not org_warehouse:
raise HttpError(404, "please setup your warehouse first")

# make sure the orgdbt here is the one we create locally
orgdbt = OrgDbt.objects.filter(org=org, gitrepo_url=None).first()
if not orgdbt:
raise HttpError(404, "dbt workspace not setup")
# # make sure the orgdbt here is the one we create locally
# orgdbt = OrgDbt.objects.filter(org=org, gitrepo_url=None).first()
# if not orgdbt:
# raise HttpError(404, "dbt workspace not setup")

check_canvas_locked(orguser, payload.canvas_lock_id)
# check_canvas_locked(orguser, payload.canvas_lock_id)

if payload.op_type not in dbtautomation_service.OPERATIONS_DICT.keys():
raise HttpError(422, "Operation not supported")
# if payload.op_type not in dbtautomation_service.OPERATIONS_DICT.keys():
# raise HttpError(422, "Operation not supported")

is_multi_input_op = payload.op_type in ["join", "unionall"]
# is_multi_input_op = payload.op_type in ["join", "unionall"]

target_model = None
if payload.target_model_uuid:
target_model = OrgDbtModel.objects.filter(uuid=payload.target_model_uuid).first()
# target_model = None
# if payload.target_model_uuid:
# target_model = OrgDbtModel.objects.filter(uuid=payload.target_model_uuid).first()

if not target_model:
target_model = OrgDbtModel.objects.create(
uuid=uuid.uuid4(),
orgdbt=orgdbt,
under_construction=True,
)
# if not target_model:
# target_model = OrgDbtModel.objects.create(
# uuid=uuid.uuid4(),
# orgdbt=orgdbt,
# under_construction=True,
# )

# only under construction models can be modified
if not target_model.under_construction:
raise HttpError(422, "model is locked")
# # only under construction models can be modified
# if not target_model.under_construction:
# raise HttpError(422, "model is locked")

current_operations_chained = OrgDbtOperation.objects.filter(dbtmodel=target_model).count()
# current_operations_chained = OrgDbtOperation.objects.filter(dbtmodel=target_model).count()

final_config, all_input_models = validate_operation_config(
payload, target_model, is_multi_input_op, current_operations_chained
)
# final_config, all_input_models = validate_operation_config(
# payload, target_model, is_multi_input_op, current_operations_chained
# )

# we create edges only with tables/models
for source in all_input_models:
edge = DbtEdge.objects.filter(from_node=source, to_node=target_model).first()
if not edge:
DbtEdge.objects.create(
from_node=source,
to_node=target_model,
)
# # we create edges only with tables/models
# for source in all_input_models:
# edge = DbtEdge.objects.filter(from_node=source, to_node=target_model).first()
# if not edge:
# DbtEdge.objects.create(
# from_node=source,
# to_node=target_model,
# )

output_cols = dbtautomation_service.get_output_cols_for_operation(
org_warehouse, payload.op_type, final_config["config"].copy()
)
logger.info("creating operation")

dbt_op = OrgDbtOperation.objects.create(
dbtmodel=target_model,
uuid=uuid.uuid4(),
seq=current_operations_chained + 1,
config=final_config,
output_cols=output_cols,
)
# output_cols = dbtautomation_service.get_output_cols_for_operation(
# org_warehouse, payload.op_type, final_config["config"].copy()
# )
# logger.info("creating operation")

logger.info("created operation")
# dbt_op = OrgDbtOperation.objects.create(
# dbtmodel=target_model,
# uuid=uuid.uuid4(),
# seq=current_operations_chained + 1,
# config=final_config,
# output_cols=output_cols,
# )

# save the output cols of the latest operation to the dbt model
target_model.output_cols = dbt_op.output_cols
target_model.save()
# logger.info("created operation")

logger.info("updated output cols for the model")
# # save the output cols of the latest operation to the dbt model
# target_model.output_cols = dbt_op.output_cols
# target_model.save()

return from_orgdbtoperation(dbt_op, chain_length=dbt_op.seq)
# logger.info("updated output cols for the model")
return create_operation_node(org, orguser, payload)

# return from_orgdbtoperation(dbt_op, chain_length=dbt_op.seq)


@transform_router.put(
Expand Down Expand Up @@ -731,3 +738,24 @@ def post_unlock_canvas(request, payload: LockCanvasRequestSchema):
canvas_lock.delete()

return {"success": 1}


@transform_router.post(
"/agent/chat/",
auth=auth.CustomAuthMiddleware(),
)
@has_permission(["can_edit_dbt_model"])
def post_generate_graph(request, payload: GenerateGraphSchema):
"""
Unlock the canvas for the org
"""
orguser: OrgUser = request.orguser
org = orguser.org

orgdbt = OrgDbt.objects.filter(org=org, gitrepo_url=None).first()
if not orgdbt:
raise HttpError(404, "dbt workspace not setup")

reply = chat_to_graph(org, orguser, payload)

return reply
Loading
Loading