diff --git a/src/airflow_dags/dags/india/consume-nwp-dag.py b/src/airflow_dags/dags/india/consume-nwp-dag.py index 6de7639..7ed1260 100644 --- a/src/airflow_dags/dags/india/consume-nwp-dag.py +++ b/src/airflow_dags/dags/india/consume-nwp-dag.py @@ -25,26 +25,36 @@ "max_active_tasks": 10, } -nwp_consumer = ContainerDefinition( - name="nwp-consumer", - container_image="ghcr.io/openclimatefix/nwp-consumer", - container_tag="1.1.10", - container_env={ +default_args = { + "name":"nwp-consumer", + "container_image":"ghcr.io/openclimatefix/nwp-consumer", + "container_tag":"1.1.32", + "container_env":{ "CONCURRENCY": "false", "LOGLEVEL": "DEBUG", }, - container_secret_env={ + "container_secret_env":{ f"{env}/data/nwp-consumer": [ "ECMWF_REALTIME_S3_ACCESS_KEY", "ECMWF_REALTIME_S3_ACCESS_SECRET", "METOFFICE_API_KEY", ], }, - container_command=["consume"], - container_cpu=512, - container_memory=1024, - domain="india", -) + "container_command":["consume"], + "container_cpu":512, + "container_memory":1024, + "domain":"india", +} + +# GFS and MetOffice consumers +nwp_consumer = ContainerDefinition(**default_args) + +# ECWMF consumer +ecmwf_args = default_args.copy() +ecmwf_args["name"] = "nwp-consumer-ecmwf" +ecmwf_args["container_memory"] = 2048 +ecmwf_args["container_cpu"] = 1024 +nwp_consumer_ecwmf = ContainerDefinition(**ecmwf_args) @dag( @@ -61,7 +71,7 @@ def nwp_consumer_dag() -> None: consume_ecmwf_op = EcsAutoRegisterRunTaskOperator( airflow_task_id="consume-ecmwf-nwp", - container_def=nwp_consumer, + container_def=nwp_consumer_ecwmf, env_overrides={ "MODEL_REPOSITORY": "ecmwf-realtime", "MODEL": "hres-ifs-india", @@ -86,6 +96,8 @@ def nwp_consumer_dag() -> None: env_overrides={ "MODEL_REPOSITORY": "gfs", "ZARRDIR": f"s3://india-nwp-{env}/gfs/data", + # SDE has nans + "ALLOWED_VALIDATION_FAILURE_PERCENTAGE": "0.07", }, on_failure_callback=slack_message_callback( f"⚠️🇮🇳 The {get_task_link()} failed."