From 7b7c7942ae0117e8bad82153dee51c1b54e9a9ac Mon Sep 17 00:00:00 2001 From: Peter Dudfield <34686298+peterdudfield@users.noreply.github.com> Date: Mon, 1 Sep 2025 16:51:56 +0100 Subject: [PATCH 01/10] Update nwp-consumer container tag to 1.1.30 --- src/airflow_dags/dags/india/consume-nwp-dag.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/airflow_dags/dags/india/consume-nwp-dag.py b/src/airflow_dags/dags/india/consume-nwp-dag.py index 6de7639..994d047 100644 --- a/src/airflow_dags/dags/india/consume-nwp-dag.py +++ b/src/airflow_dags/dags/india/consume-nwp-dag.py @@ -28,7 +28,7 @@ nwp_consumer = ContainerDefinition( name="nwp-consumer", container_image="ghcr.io/openclimatefix/nwp-consumer", - container_tag="1.1.10", + container_tag="1.1.30", container_env={ "CONCURRENCY": "false", "LOGLEVEL": "DEBUG", From ad1f32e452faa35c2c290f61f6b928789f5b985a Mon Sep 17 00:00:00 2001 From: Peter Dudfield Date: Mon, 1 Sep 2025 20:45:18 +0100 Subject: [PATCH 02/10] add cpu and memory overrides --- src/airflow_dags/dags/india/consume-nwp-dag.py | 3 +++ .../plugins/operators/ecs_run_task_operator.py | 8 ++++++++ 2 files changed, 11 insertions(+) diff --git a/src/airflow_dags/dags/india/consume-nwp-dag.py b/src/airflow_dags/dags/india/consume-nwp-dag.py index 994d047..da49948 100644 --- a/src/airflow_dags/dags/india/consume-nwp-dag.py +++ b/src/airflow_dags/dags/india/consume-nwp-dag.py @@ -69,6 +69,8 @@ def nwp_consumer_dag() -> None: "ECMWF_REALTIME_S3_REGION": "eu-west-1", "ZARRDIR": f"s3://india-nwp-{env}/ecmwf/data", }, + cpu_override=1024, + memory_override=2048, max_active_tis_per_dag=10, on_failure_callback=slack_message_callback( f"⚠️🇮🇳 The {get_task_link()} failed. " @@ -86,6 +88,7 @@ def nwp_consumer_dag() -> None: env_overrides={ "MODEL_REPOSITORY": "gfs", "ZARRDIR": f"s3://india-nwp-{env}/gfs/data", + #TODO change nan threshold }, on_failure_callback=slack_message_callback( f"⚠️🇮🇳 The {get_task_link()} failed." diff --git a/src/airflow_dags/plugins/operators/ecs_run_task_operator.py b/src/airflow_dags/plugins/operators/ecs_run_task_operator.py index 00ee364..0020e27 100644 --- a/src/airflow_dags/plugins/operators/ecs_run_task_operator.py +++ b/src/airflow_dags/plugins/operators/ecs_run_task_operator.py @@ -30,6 +30,8 @@ def __init__( container_def: "ContainerDefinition", env_overrides: dict[str, str] | None = None, command_override: list[str] | None = None, + cpu_override: int | None = None, + memory_override: int | None = None, **kwargs: int | bool | str | dict[str, str] | list[str], ) -> None: """Create a new instance of the class.""" @@ -43,6 +45,12 @@ def __init__( if command_override: overrides_dict["command"] = command_override + if cpu_override: + overrides_dict["cpu"] = cpu_override + + if memory_override: + overrides_dict["memory"] = memory_override + networks_dict: dict[str, dict[str, list[str] | str]] = { "awsvpcConfiguration": { "subnets": [os.getenv("ECS_SUBNET", "")], From 57d3f427782019d20b16fdba21114cee25d1cb87 Mon Sep 17 00:00:00 2001 From: Peter Dudfield Date: Tue, 2 Sep 2025 14:32:55 +0100 Subject: [PATCH 03/10] use 1.1.31, and increase gfs nan threshold --- src/airflow_dags/dags/india/consume-nwp-dag.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/airflow_dags/dags/india/consume-nwp-dag.py b/src/airflow_dags/dags/india/consume-nwp-dag.py index da49948..3219cf8 100644 --- a/src/airflow_dags/dags/india/consume-nwp-dag.py +++ b/src/airflow_dags/dags/india/consume-nwp-dag.py @@ -28,7 +28,7 @@ nwp_consumer = ContainerDefinition( name="nwp-consumer", container_image="ghcr.io/openclimatefix/nwp-consumer", - container_tag="1.1.30", + container_tag="1.1.31", container_env={ "CONCURRENCY": "false", "LOGLEVEL": "DEBUG", @@ -88,7 +88,8 @@ def nwp_consumer_dag() -> None: env_overrides={ "MODEL_REPOSITORY": "gfs", "ZARRDIR": f"s3://india-nwp-{env}/gfs/data", - #TODO change nan threshold + # SDE has nans + "IMAGES_FAILING_NAN_CHECK_THRESHOLD": "0.07" }, on_failure_callback=slack_message_callback( f"⚠️🇮🇳 The {get_task_link()} failed." From 106a571854d5c1bdf3f62cb4d5b813612e46c3dc Mon Sep 17 00:00:00 2001 From: Peter Dudfield Date: Tue, 2 Sep 2025 15:23:52 +0100 Subject: [PATCH 04/10] create two different container definitions --- src/airflow_dags/dags/india/consume-nwp-dag.py | 18 +++++++++++++----- 1 file changed, 13 insertions(+), 5 deletions(-) diff --git a/src/airflow_dags/dags/india/consume-nwp-dag.py b/src/airflow_dags/dags/india/consume-nwp-dag.py index 3219cf8..8b01ab3 100644 --- a/src/airflow_dags/dags/india/consume-nwp-dag.py +++ b/src/airflow_dags/dags/india/consume-nwp-dag.py @@ -25,7 +25,7 @@ "max_active_tasks": 10, } -nwp_consumer = ContainerDefinition( +default_args = dict( name="nwp-consumer", container_image="ghcr.io/openclimatefix/nwp-consumer", container_tag="1.1.31", @@ -46,6 +46,16 @@ domain="india", ) +# GFS and MetOffice consumers +nwp_consumer = ContainerDefinition(**default_args) + +# ECWMF consumer +ecmwf_args = default_args.copy() +ecmwf_args["name"] = "nwp-consumer-ecmwf" +ecmwf_args["container_memory"] = 2048 +ecmwf_args["container_cpu"] = 1024 +nwp_consumer_ecwmf = ContainerDefinition(**ecmwf_args) + @dag( dag_id="india-consume-nwp", @@ -61,7 +71,7 @@ def nwp_consumer_dag() -> None: consume_ecmwf_op = EcsAutoRegisterRunTaskOperator( airflow_task_id="consume-ecmwf-nwp", - container_def=nwp_consumer, + container_def=nwp_consumer_ecwmf, env_overrides={ "MODEL_REPOSITORY": "ecmwf-realtime", "MODEL": "hres-ifs-india", @@ -69,8 +79,6 @@ def nwp_consumer_dag() -> None: "ECMWF_REALTIME_S3_REGION": "eu-west-1", "ZARRDIR": f"s3://india-nwp-{env}/ecmwf/data", }, - cpu_override=1024, - memory_override=2048, max_active_tis_per_dag=10, on_failure_callback=slack_message_callback( f"⚠️🇮🇳 The {get_task_link()} failed. " @@ -89,7 +97,7 @@ def nwp_consumer_dag() -> None: "MODEL_REPOSITORY": "gfs", "ZARRDIR": f"s3://india-nwp-{env}/gfs/data", # SDE has nans - "IMAGES_FAILING_NAN_CHECK_THRESHOLD": "0.07" + "ALLOWED_VALIDATION_FAILURE_PERCENTAGE": "0.07" }, on_failure_callback=slack_message_callback( f"⚠️🇮🇳 The {get_task_link()} failed." From ffe339275fbf3b8fc1d73d3b0ea912d7366ca28e Mon Sep 17 00:00:00 2001 From: Peter Dudfield Date: Tue, 2 Sep 2025 15:37:55 +0100 Subject: [PATCH 05/10] lint --- src/airflow_dags/dags/india/consume-nwp-dag.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/airflow_dags/dags/india/consume-nwp-dag.py b/src/airflow_dags/dags/india/consume-nwp-dag.py index 8b01ab3..ce7ffda 100644 --- a/src/airflow_dags/dags/india/consume-nwp-dag.py +++ b/src/airflow_dags/dags/india/consume-nwp-dag.py @@ -97,7 +97,7 @@ def nwp_consumer_dag() -> None: "MODEL_REPOSITORY": "gfs", "ZARRDIR": f"s3://india-nwp-{env}/gfs/data", # SDE has nans - "ALLOWED_VALIDATION_FAILURE_PERCENTAGE": "0.07" + "ALLOWED_VALIDATION_FAILURE_PERCENTAGE": "0.07", }, on_failure_callback=slack_message_callback( f"⚠️🇮🇳 The {get_task_link()} failed." From d724d34258d1569ac1f64626a72da39a7262243d Mon Sep 17 00:00:00 2001 From: Peter Dudfield Date: Tue, 2 Sep 2025 16:29:39 +0100 Subject: [PATCH 06/10] lint --- .../dags/india/consume-nwp-dag.py | 23 ++++++++++--------- 1 file changed, 12 insertions(+), 11 deletions(-) diff --git a/src/airflow_dags/dags/india/consume-nwp-dag.py b/src/airflow_dags/dags/india/consume-nwp-dag.py index ce7ffda..1811117 100644 --- a/src/airflow_dags/dags/india/consume-nwp-dag.py +++ b/src/airflow_dags/dags/india/consume-nwp-dag.py @@ -25,26 +25,26 @@ "max_active_tasks": 10, } -default_args = dict( - name="nwp-consumer", - container_image="ghcr.io/openclimatefix/nwp-consumer", - container_tag="1.1.31", - container_env={ +default_args = { + "name":"nwp-consumer", + "container_image":"ghcr.io/openclimatefix/nwp-consumer", + "container_tag":"no-results", + "container_env":{ "CONCURRENCY": "false", "LOGLEVEL": "DEBUG", }, - container_secret_env={ + "container_secret_env":{ f"{env}/data/nwp-consumer": [ "ECMWF_REALTIME_S3_ACCESS_KEY", "ECMWF_REALTIME_S3_ACCESS_SECRET", "METOFFICE_API_KEY", ], }, - container_command=["consume"], - container_cpu=512, - container_memory=1024, - domain="india", -) + "container_command":["consume"], + "container_cpu":512, + "container_memory":1024, + "domain":"india", +} # GFS and MetOffice consumers nwp_consumer = ContainerDefinition(**default_args) @@ -117,6 +117,7 @@ def nwp_consumer_dag() -> None: "MODEL_REPOSITORY": "metoffice-datahub", "METOFFICE_ORDER_ID": "india-11params-54steps", "ZARRDIR": f"s3://india-nwp-{env}/metoffice/data", + "METOFFICE_DELAY_MINUTES":"300", }, on_failure_callback=slack_message_callback( f"⚠️🇮🇳 The {get_task_link()} failed. " From 1a388152007f71c895e27e08f92a3cf1f30b1d5d Mon Sep 17 00:00:00 2001 From: Peter Dudfield Date: Tue, 2 Sep 2025 17:31:49 +0100 Subject: [PATCH 07/10] use 0 and 12 run times, and 6 hour delay --- src/airflow_dags/dags/india/consume-nwp-dag.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/airflow_dags/dags/india/consume-nwp-dag.py b/src/airflow_dags/dags/india/consume-nwp-dag.py index 1811117..95c6e48 100644 --- a/src/airflow_dags/dags/india/consume-nwp-dag.py +++ b/src/airflow_dags/dags/india/consume-nwp-dag.py @@ -117,7 +117,8 @@ def nwp_consumer_dag() -> None: "MODEL_REPOSITORY": "metoffice-datahub", "METOFFICE_ORDER_ID": "india-11params-54steps", "ZARRDIR": f"s3://india-nwp-{env}/metoffice/data", - "METOFFICE_DELAY_MINUTES":"300", + "METOFFICE_DELAY_MINUTES":"360", # 6 hours + "MODEL": "um-global-10km-india-0-12", }, on_failure_callback=slack_message_callback( f"⚠️🇮🇳 The {get_task_link()} failed. " From 1a367e8ac4b2dad18858a71c796b677d74428a7f Mon Sep 17 00:00:00 2001 From: Peter Dudfield Date: Fri, 5 Sep 2025 14:57:17 +0100 Subject: [PATCH 08/10] remove cpu and memoery override --- .../plugins/operators/ecs_run_task_operator.py | 8 -------- 1 file changed, 8 deletions(-) diff --git a/src/airflow_dags/plugins/operators/ecs_run_task_operator.py b/src/airflow_dags/plugins/operators/ecs_run_task_operator.py index 0020e27..00ee364 100644 --- a/src/airflow_dags/plugins/operators/ecs_run_task_operator.py +++ b/src/airflow_dags/plugins/operators/ecs_run_task_operator.py @@ -30,8 +30,6 @@ def __init__( container_def: "ContainerDefinition", env_overrides: dict[str, str] | None = None, command_override: list[str] | None = None, - cpu_override: int | None = None, - memory_override: int | None = None, **kwargs: int | bool | str | dict[str, str] | list[str], ) -> None: """Create a new instance of the class.""" @@ -45,12 +43,6 @@ def __init__( if command_override: overrides_dict["command"] = command_override - if cpu_override: - overrides_dict["cpu"] = cpu_override - - if memory_override: - overrides_dict["memory"] = memory_override - networks_dict: dict[str, dict[str, list[str] | str]] = { "awsvpcConfiguration": { "subnets": [os.getenv("ECS_SUBNET", "")], From 5e981f23f4bb6077be2e874aac8b32295a1dbabb Mon Sep 17 00:00:00 2001 From: Peter Dudfield Date: Fri, 5 Sep 2025 15:07:18 +0100 Subject: [PATCH 09/10] tidy up --- src/airflow_dags/dags/india/consume-nwp-dag.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/src/airflow_dags/dags/india/consume-nwp-dag.py b/src/airflow_dags/dags/india/consume-nwp-dag.py index 95c6e48..e146cbf 100644 --- a/src/airflow_dags/dags/india/consume-nwp-dag.py +++ b/src/airflow_dags/dags/india/consume-nwp-dag.py @@ -117,8 +117,6 @@ def nwp_consumer_dag() -> None: "MODEL_REPOSITORY": "metoffice-datahub", "METOFFICE_ORDER_ID": "india-11params-54steps", "ZARRDIR": f"s3://india-nwp-{env}/metoffice/data", - "METOFFICE_DELAY_MINUTES":"360", # 6 hours - "MODEL": "um-global-10km-india-0-12", }, on_failure_callback=slack_message_callback( f"⚠️🇮🇳 The {get_task_link()} failed. " From a3344eaccc010117862c9bc63af3387a1d3ad0c7 Mon Sep 17 00:00:00 2001 From: Peter Dudfield Date: Mon, 15 Sep 2025 17:49:01 +0100 Subject: [PATCH 10/10] update to 1.1.32 --- src/airflow_dags/dags/india/consume-nwp-dag.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/airflow_dags/dags/india/consume-nwp-dag.py b/src/airflow_dags/dags/india/consume-nwp-dag.py index e146cbf..7ed1260 100644 --- a/src/airflow_dags/dags/india/consume-nwp-dag.py +++ b/src/airflow_dags/dags/india/consume-nwp-dag.py @@ -28,7 +28,7 @@ default_args = { "name":"nwp-consumer", "container_image":"ghcr.io/openclimatefix/nwp-consumer", - "container_tag":"no-results", + "container_tag":"1.1.32", "container_env":{ "CONCURRENCY": "false", "LOGLEVEL": "DEBUG",