From c928344cb9c3fbbd466202f616a9d08ac84cd685 Mon Sep 17 00:00:00 2001 From: salmany Date: Thu, 24 Jul 2025 00:36:22 +0000 Subject: [PATCH 01/14] Add Managed Kafka Connect code samples for clusters * Adds code examples for creating, deleting, getting, listing and updating Managed Kafka Connect clusters --- .../connect/clusters/clusters_test.py | 176 ++++++++++++++++++ .../clusters/create_connect_cluster.py | 84 +++++++++ .../clusters/delete_connect_cluster.py | 59 ++++++ .../connect/clusters/get_connect_cluster.py | 55 ++++++ .../connect/clusters/list_connect_clusters.py | 47 +++++ .../clusters/update_connect_cluster.py | 71 +++++++ 6 files changed, 492 insertions(+) create mode 100644 managedkafka/snippets/connect/clusters/clusters_test.py create mode 100644 managedkafka/snippets/connect/clusters/create_connect_cluster.py create mode 100644 managedkafka/snippets/connect/clusters/delete_connect_cluster.py create mode 100644 managedkafka/snippets/connect/clusters/get_connect_cluster.py create mode 100644 managedkafka/snippets/connect/clusters/list_connect_clusters.py create mode 100644 managedkafka/snippets/connect/clusters/update_connect_cluster.py diff --git a/managedkafka/snippets/connect/clusters/clusters_test.py b/managedkafka/snippets/connect/clusters/clusters_test.py new file mode 100644 index 00000000000..6d8c2b1c69e --- /dev/null +++ b/managedkafka/snippets/connect/clusters/clusters_test.py @@ -0,0 +1,176 @@ +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from unittest import mock +from unittest.mock import MagicMock + +from google.api_core.operation import Operation +from google.cloud import managedkafka_v1 +import pytest + +import create_connect_cluster +import delete_connect_cluster +import get_connect_cluster +import list_connect_clusters +import update_connect_cluster + +PROJECT_ID = "test-project-id" +REGION = "us-central1" +KAFKA_CLUSTER_ID = "test-cluster-id" +CONNECT_CLUSTER_ID = "test-connect-cluster-id" + + +@mock.patch( + "google.cloud.managedkafka_v1.services.managed_kafka_connect.ManagedKafkaConnectClient.create_connect_cluster" +) +def test_create_connect_cluster( + mock_method: MagicMock, + capsys: pytest.CaptureFixture[str], +) -> None: + cpu = 3 + memory_bytes = 3221225472 + primary_subnet = "test-subnet" + operation = mock.MagicMock(spec=Operation) + connect_cluster = managedkafka_v1.types.ConnectCluster() + connect_cluster.name = ( + managedkafka_v1.ManagedKafkaConnectClient.connect_cluster_path( + PROJECT_ID, REGION, CONNECT_CLUSTER_ID + ) + ) + operation.result = mock.MagicMock(return_value=connect_cluster) + mock_method.return_value = operation + + create_connect_cluster.create_connect_cluster( + project_id=PROJECT_ID, + region=REGION, + connect_cluster_id=CONNECT_CLUSTER_ID, + kafka_cluster_id=KAFKA_CLUSTER_ID, + primary_subnet=primary_subnet, + cpu=cpu, + memory_bytes=memory_bytes, + ) + + out, _ = capsys.readouterr() + assert "Created Connect cluster" in out + assert CONNECT_CLUSTER_ID in out + mock_method.assert_called_once() + + +@mock.patch( + "google.cloud.managedkafka_v1.services.managed_kafka_connect.ManagedKafkaConnectClient.get_connect_cluster" +) +def test_get_connect_cluster( + mock_method: MagicMock, + capsys: pytest.CaptureFixture[str], +) -> None: + connect_cluster = managedkafka_v1.types.ConnectCluster() + connect_cluster.name = ( + managedkafka_v1.ManagedKafkaConnectClient.connect_cluster_path( + PROJECT_ID, REGION, CONNECT_CLUSTER_ID + ) + ) + mock_method.return_value = connect_cluster + + get_connect_cluster.get_connect_cluster( + project_id=PROJECT_ID, + region=REGION, + connect_cluster_id=CONNECT_CLUSTER_ID, + ) + + out, _ = capsys.readouterr() + assert "Got Connect cluster" in out + assert CONNECT_CLUSTER_ID in out + mock_method.assert_called_once() + + +@mock.patch( + "google.cloud.managedkafka_v1.services.managed_kafka_connect.ManagedKafkaConnectClient.update_connect_cluster" +) +def test_update_connect_cluster( + mock_method: MagicMock, + capsys: pytest.CaptureFixture[str], +) -> None: + new_memory_bytes = 3221225475 + operation = mock.MagicMock(spec=Operation) + connect_cluster = managedkafka_v1.types.ConnectCluster() + connect_cluster.name = ( + managedkafka_v1.ManagedKafkaConnectClient.connect_cluster_path( + PROJECT_ID, REGION, CONNECT_CLUSTER_ID + ) + ) + connect_cluster.capacity_config.memory_bytes = new_memory_bytes + operation.result = mock.MagicMock(return_value=connect_cluster) + mock_method.return_value = operation + + update_connect_cluster.update_connect_cluster( + project_id=PROJECT_ID, + region=REGION, + connect_cluster_id=CONNECT_CLUSTER_ID, + memory_bytes=new_memory_bytes, + ) + + out, _ = capsys.readouterr() + assert "Updated Connect cluster" in out + assert CONNECT_CLUSTER_ID in out + assert str(new_memory_bytes) in out + mock_method.assert_called_once() + + +@mock.patch( + "google.cloud.managedkafka_v1.services.managed_kafka_connect.ManagedKafkaConnectClient.list_connect_clusters" +) +def test_list_connect_clusters( + mock_method: MagicMock, + capsys: pytest.CaptureFixture[str], +) -> None: + connect_cluster = managedkafka_v1.types.ConnectCluster() + connect_cluster.name = ( + managedkafka_v1.ManagedKafkaConnectClient.connect_cluster_path( + PROJECT_ID, REGION, CONNECT_CLUSTER_ID + ) + ) + + response = [connect_cluster] + mock_method.return_value = response + + list_connect_clusters.list_connect_clusters( + project_id=PROJECT_ID, + region=REGION, + ) + + out, _ = capsys.readouterr() + assert "Got Connect cluster" in out + assert CONNECT_CLUSTER_ID in out + mock_method.assert_called_once() + + +@mock.patch( + "google.cloud.managedkafka_v1.services.managed_kafka_connect.ManagedKafkaConnectClient.delete_connect_cluster" +) +def test_delete_connect_cluster( + mock_method: MagicMock, + capsys: pytest.CaptureFixture[str], +) -> None: + operation = mock.MagicMock(spec=Operation) + mock_method.return_value = operation + + delete_connect_cluster.delete_connect_cluster( + project_id=PROJECT_ID, + region=REGION, + connect_cluster_id=CONNECT_CLUSTER_ID, + ) + + out, _ = capsys.readouterr() + assert "Deleted Connect cluster" in out + mock_method.assert_called_once() diff --git a/managedkafka/snippets/connect/clusters/create_connect_cluster.py b/managedkafka/snippets/connect/clusters/create_connect_cluster.py new file mode 100644 index 00000000000..174e9fc94b5 --- /dev/null +++ b/managedkafka/snippets/connect/clusters/create_connect_cluster.py @@ -0,0 +1,84 @@ +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +def create_connect_cluster( + project_id: str, + region: str, + connect_cluster_id: str, + kafka_cluster_id: str, + primary_subnet: str, + cpu: int, + memory_bytes: int, +) -> None: + """ + Create a Kafka Connect cluster. + + Args: + project_id: Google Cloud project ID. + region: Cloud region. + connect_cluster_id: ID of the Kafka Connect cluster. + kafka_cluster_id: The ID of the primary Managed Service for Apache Kafka cluster. + primary_subnet: The primary VPC subnet for the Connect cluster workers. The expected format is projects/{project_id}/regions/{region}/subnetworks/{subnet_id}. + cpu: Number of vCPUs to provision for the cluster. The minimum is 3. + memory_bytes: The memory to provision for the cluster in bytes. Must be between 1 GiB * cpu and 8 GiB * cpu. + + Raises: + This method will raise the GoogleAPICallError exception if the operation errors or + the timeout before the operation completes is reached. + """ + # [START managedkafka_create_connect_cluster] + from google.api_core.exceptions import GoogleAPICallError + from google.cloud import managedkafka_v1 + from google.cloud.managedkafka_v1.services.managed_kafka_connect import ManagedKafkaConnectClient + from google.cloud.managedkafka_v1.types import ConnectCluster, CreateConnectClusterRequest, ConnectNetworkConfig + + # TODO(developer): Update with your values. + # project_id = "my-project-id" + # region = "us-central1" + # connect_cluster_id = "my-connect-cluster" + # kafka_cluster_id = "my-kafka-cluster" + # subnet = "projects/my-project-id/regions/us-central1/subnetworks/default" + # cpu = 3 + # memory_bytes = 3221225472 # 3 GiB + + connect_client = ManagedKafkaConnectClient() + kafka_client = managedkafka_v1.ManagedKafkaClient() + + parent = connect_client.common_location_path(project_id, region) + kafka_cluster_path = kafka_client.cluster_path(project_id, region, kafka_cluster_id) + + connect_cluster = ConnectCluster() + connect_cluster.name = connect_client.connect_cluster_path(project_id, region, connect_cluster_id) + connect_cluster.kafka_cluster = kafka_cluster_path + connect_cluster.capacity_config.vcpu_count = cpu + connect_cluster.capacity_config.memory_bytes = memory_bytes + connect_cluster.gcp_config.access_config.network_configs = [ConnectNetworkConfig(primary_subnet=primary_subnet)] + + request = CreateConnectClusterRequest( + parent=parent, + connect_cluster_id=connect_cluster_id, + connect_cluster=connect_cluster, + ) + + try: + operation = connect_client.create_connect_cluster(request=request) + print(f"Waiting for operation {operation.operation.name} to complete...") + # Creating a Connect cluster can take 20-30 minutes. + response = operation.result(timeout=1800) + print("Created Connect cluster:", response) + except GoogleAPICallError as e: + print(f"The operation failed with error: {e.message}") + + # [END managedkafka_create_connect_cluster] \ No newline at end of file diff --git a/managedkafka/snippets/connect/clusters/delete_connect_cluster.py b/managedkafka/snippets/connect/clusters/delete_connect_cluster.py new file mode 100644 index 00000000000..642a6cdac20 --- /dev/null +++ b/managedkafka/snippets/connect/clusters/delete_connect_cluster.py @@ -0,0 +1,59 @@ +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +def delete_connect_cluster( + project_id: str, + region: str, + connect_cluster_id: str, +) -> None: + """ + Delete a Kafka Connect cluster. + + Args: + project_id: Google Cloud project ID. + region: Cloud region. + connect_cluster_id: ID of the Kafka Connect cluster. + + Raises: + This method will raise the GoogleAPICallError exception if the operation errors or + the timeout before the operation completes is reached. + """ + # [START managedkafka_delete_connect_cluster] + from google.api_core.exceptions import GoogleAPICallError + from google.cloud.managedkafka_v1.services.managed_kafka_connect import ( + ManagedKafkaConnectClient, + ) + from google.cloud import managedkafka_v1 + + # TODO(developer) + # project_id = "my-project-id" + # region = "us-central1" + # connect_cluster_id = "my-connect-cluster" + + client = ManagedKafkaConnectClient() + + request = managedkafka_v1.DeleteConnectClusterRequest( + name=client.connect_cluster_path(project_id, region, connect_cluster_id), + ) + + try: + operation = client.delete_connect_cluster(request=request) + print(f"Waiting for operation {operation.operation.name} to complete...") + operation.result() + print("Deleted Connect cluster") + except GoogleAPICallError as e: + print(f"The operation failed with error: {e.message}") + + # [END managedkafka_delete_connect_cluster] diff --git a/managedkafka/snippets/connect/clusters/get_connect_cluster.py b/managedkafka/snippets/connect/clusters/get_connect_cluster.py new file mode 100644 index 00000000000..cd8adefbf72 --- /dev/null +++ b/managedkafka/snippets/connect/clusters/get_connect_cluster.py @@ -0,0 +1,55 @@ +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +def get_connect_cluster( + project_id: str, + region: str, + connect_cluster_id: str, +) -> None: + """ + Get a Kafka Connect cluster. + + Args: + project_id: Google Cloud project ID. + region: Cloud region. + connect_cluster_id: ID of the Kafka Connect cluster. + + Raises: + This method will raise the NotFound exception if the Connect cluster is not found. + """ + # [START managedkafka_get_connect_cluster] + from google.api_core.exceptions import NotFound + from google.cloud.managedkafka_v1.services.managed_kafka_connect import ManagedKafkaConnectClient + from google.cloud import managedkafka_v1 + + # TODO(developer) + # project_id = "my-project-id" + # region = "us-central1" + # connect_cluster_id = "my-connect-cluster" + + client = ManagedKafkaConnectClient() + + cluster_path = client.connect_cluster_path(project_id, region, connect_cluster_id) + request = managedkafka_v1.GetConnectClusterRequest( + name=cluster_path, + ) + + try: + cluster = client.get_connect_cluster(request=request) + print("Got Connect cluster:", cluster) + except NotFound as e: + print(f"Failed to get Connect cluster {connect_cluster_id} with error: {e.message}") + + # [END managedkafka_get_connect_cluster] diff --git a/managedkafka/snippets/connect/clusters/list_connect_clusters.py b/managedkafka/snippets/connect/clusters/list_connect_clusters.py new file mode 100644 index 00000000000..049c23b3cdc --- /dev/null +++ b/managedkafka/snippets/connect/clusters/list_connect_clusters.py @@ -0,0 +1,47 @@ +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +def list_connect_clusters( + project_id: str, + region: str, +) -> None: + """ + List Kafka Connect clusters in a given project ID and region. + + Args: + project_id: Google Cloud project ID. + region: Cloud region. + """ + # [START managedkafka_list_connect_clusters] + from google.cloud import managedkafka_v1 + from google.cloud.managedkafka_v1.services.managed_kafka_connect import ( + ManagedKafkaConnectClient, + ) + + # TODO(developer) + # project_id = "my-project-id" + # region = "us-central1" + + connect_client = ManagedKafkaConnectClient() + + request = managedkafka_v1.ListConnectClustersRequest( + parent=connect_client.common_location_path(project_id, region), + ) + + response = connect_client.list_connect_clusters(request=request) + for cluster in response: + print("Got Connect cluster:", cluster) + + # [END managedkafka_list_connect_clusters] diff --git a/managedkafka/snippets/connect/clusters/update_connect_cluster.py b/managedkafka/snippets/connect/clusters/update_connect_cluster.py new file mode 100644 index 00000000000..722136956e3 --- /dev/null +++ b/managedkafka/snippets/connect/clusters/update_connect_cluster.py @@ -0,0 +1,71 @@ +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +def update_connect_cluster( + project_id: str, region: str, connect_cluster_id: str, memory_bytes: int +) -> None: + """ + Update a Kafka Connect cluster. + + Args: + project_id: Google Cloud project ID. + region: Cloud region. + connect_cluster_id: ID of the Kafka Connect cluster. + memory_bytes: The memory to provision for the cluster in bytes. + + Raises: + This method will raise the GoogleAPICallError exception if the operation errors or + the timeout before the operation completes is reached. + """ + # [START managedkafka_update_connect_cluster] + from google.api_core.exceptions import GoogleAPICallError + from google.cloud import managedkafka_v1 + from google.cloud.managedkafka_v1.services.managed_kafka_connect import ( + ManagedKafkaConnectClient, + ) + from google.cloud.managedkafka_v1.types import ConnectCluster + from google.protobuf import field_mask_pb2 + + # TODO(developer) + # project_id = "my-project-id" + # region = "us-central1" + # connect_cluster_id = "my-connect-cluster" + # memory_bytes = 4295000000 + + connect_client = ManagedKafkaConnectClient() + + connect_cluster = ConnectCluster() + connect_cluster.name = connect_client.connect_cluster_path( + project_id, region, connect_cluster_id + ) + connect_cluster.capacity_config.memory_bytes = memory_bytes + update_mask = field_mask_pb2.FieldMask() + update_mask.paths.append("capacity_config.memory_bytes") + + # For a list of editable fields, one can check https://cloud.google.com/managed-service-for-apache-kafka/docs/connect-cluster/create-connect-cluster#properties. + request = managedkafka_v1.UpdateConnectClusterRequest( + update_mask=update_mask, + connect_cluster=connect_cluster, + ) + + try: + operation = connect_client.update_connect_cluster(request=request) + print(f"Waiting for operation {operation.operation.name} to complete...") + response = operation.result() + print("Updated Connect cluster:", response) + except GoogleAPICallError as e: + print(f"The operation failed with error: {e.message}") + + # [END managedkafka_update_connect_cluster] From 8f2224a4c7104a3eab01faf7ada6c04851799bf4 Mon Sep 17 00:00:00 2001 From: salmany Date: Thu, 24 Jul 2025 00:42:16 +0000 Subject: [PATCH 02/14] Update google-cloud-managedkafka version to 0.1.12 --- managedkafka/snippets/requirements.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/managedkafka/snippets/requirements.txt b/managedkafka/snippets/requirements.txt index a7da4ff6516..5f372e81c41 100644 --- a/managedkafka/snippets/requirements.txt +++ b/managedkafka/snippets/requirements.txt @@ -2,5 +2,5 @@ protobuf==5.29.4 pytest==8.2.2 google-api-core==2.23.0 google-auth==2.38.0 -google-cloud-managedkafka==0.1.5 +google-cloud-managedkafka==0.1.12 googleapis-common-protos==1.66.0 From 1feb3544a25608ba003dede93ccf1ffa4db967a1 Mon Sep 17 00:00:00 2001 From: Salman Yousaf <37085288+salmany@users.noreply.github.com> Date: Wed, 23 Jul 2025 20:50:00 -0400 Subject: [PATCH 03/14] Update managedkafka/snippets/connect/clusters/create_connect_cluster.py Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com> --- .../snippets/connect/clusters/create_connect_cluster.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/managedkafka/snippets/connect/clusters/create_connect_cluster.py b/managedkafka/snippets/connect/clusters/create_connect_cluster.py index 174e9fc94b5..1d3cf0a9284 100644 --- a/managedkafka/snippets/connect/clusters/create_connect_cluster.py +++ b/managedkafka/snippets/connect/clusters/create_connect_cluster.py @@ -49,7 +49,7 @@ def create_connect_cluster( # region = "us-central1" # connect_cluster_id = "my-connect-cluster" # kafka_cluster_id = "my-kafka-cluster" - # subnet = "projects/my-project-id/regions/us-central1/subnetworks/default" + # primary_subnet = "projects/my-project-id/regions/us-central1/subnetworks/default" # cpu = 3 # memory_bytes = 3221225472 # 3 GiB From 5840336f20298fce760ca3803048b2b423a9551a Mon Sep 17 00:00:00 2001 From: Salman Yousaf <37085288+salmany@users.noreply.github.com> Date: Wed, 23 Jul 2025 20:50:24 -0400 Subject: [PATCH 04/14] Update managedkafka/snippets/connect/clusters/create_connect_cluster.py Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com> --- .../snippets/connect/clusters/create_connect_cluster.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/managedkafka/snippets/connect/clusters/create_connect_cluster.py b/managedkafka/snippets/connect/clusters/create_connect_cluster.py index 1d3cf0a9284..004ed8610bd 100644 --- a/managedkafka/snippets/connect/clusters/create_connect_cluster.py +++ b/managedkafka/snippets/connect/clusters/create_connect_cluster.py @@ -79,6 +79,6 @@ def create_connect_cluster( response = operation.result(timeout=1800) print("Created Connect cluster:", response) except GoogleAPICallError as e: - print(f"The operation failed with error: {e.message}") + print(f"The operation failed with error: {e}") # [END managedkafka_create_connect_cluster] \ No newline at end of file From d0df10fa42b6e6048b173cf88a3de524515c4d47 Mon Sep 17 00:00:00 2001 From: Salman Yousaf <37085288+salmany@users.noreply.github.com> Date: Wed, 23 Jul 2025 20:50:35 -0400 Subject: [PATCH 05/14] Update managedkafka/snippets/connect/clusters/delete_connect_cluster.py Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com> --- .../snippets/connect/clusters/delete_connect_cluster.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/managedkafka/snippets/connect/clusters/delete_connect_cluster.py b/managedkafka/snippets/connect/clusters/delete_connect_cluster.py index 642a6cdac20..f05bc1fbdcf 100644 --- a/managedkafka/snippets/connect/clusters/delete_connect_cluster.py +++ b/managedkafka/snippets/connect/clusters/delete_connect_cluster.py @@ -54,6 +54,6 @@ def delete_connect_cluster( operation.result() print("Deleted Connect cluster") except GoogleAPICallError as e: - print(f"The operation failed with error: {e.message}") + print(f"The operation failed with error: {e}") # [END managedkafka_delete_connect_cluster] From 505759a4cf37c1359b6217bbc87cad629fff5923 Mon Sep 17 00:00:00 2001 From: Salman Yousaf <37085288+salmany@users.noreply.github.com> Date: Wed, 23 Jul 2025 20:50:42 -0400 Subject: [PATCH 06/14] Update managedkafka/snippets/connect/clusters/get_connect_cluster.py Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com> --- managedkafka/snippets/connect/clusters/get_connect_cluster.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/managedkafka/snippets/connect/clusters/get_connect_cluster.py b/managedkafka/snippets/connect/clusters/get_connect_cluster.py index cd8adefbf72..8dfd39b5958 100644 --- a/managedkafka/snippets/connect/clusters/get_connect_cluster.py +++ b/managedkafka/snippets/connect/clusters/get_connect_cluster.py @@ -50,6 +50,6 @@ def get_connect_cluster( cluster = client.get_connect_cluster(request=request) print("Got Connect cluster:", cluster) except NotFound as e: - print(f"Failed to get Connect cluster {connect_cluster_id} with error: {e.message}") + print(f"Failed to get Connect cluster {connect_cluster_id} with error: {e}") # [END managedkafka_get_connect_cluster] From d3fba2c9784ad8f49e9b8941bbf8101515de8e65 Mon Sep 17 00:00:00 2001 From: Salman Yousaf <37085288+salmany@users.noreply.github.com> Date: Wed, 23 Jul 2025 20:51:04 -0400 Subject: [PATCH 07/14] Update managedkafka/snippets/connect/clusters/list_connect_clusters.py Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com> --- .../snippets/connect/clusters/list_connect_clusters.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/managedkafka/snippets/connect/clusters/list_connect_clusters.py b/managedkafka/snippets/connect/clusters/list_connect_clusters.py index 049c23b3cdc..615626017db 100644 --- a/managedkafka/snippets/connect/clusters/list_connect_clusters.py +++ b/managedkafka/snippets/connect/clusters/list_connect_clusters.py @@ -42,6 +42,11 @@ def list_connect_clusters( response = connect_client.list_connect_clusters(request=request) for cluster in response: - print("Got Connect cluster:", cluster) + try: + response = connect_client.list_connect_clusters(request=request) + for cluster in response: + print("Got Connect cluster:", cluster) + except GoogleAPICallError as e: + print(f"Failed to list Connect clusters with error: {e}") # [END managedkafka_list_connect_clusters] From 0a85a8bf9d725d7c7a0b310b5f6032110e558f4b Mon Sep 17 00:00:00 2001 From: Salman Yousaf <37085288+salmany@users.noreply.github.com> Date: Wed, 23 Jul 2025 20:51:16 -0400 Subject: [PATCH 08/14] Update managedkafka/snippets/connect/clusters/update_connect_cluster.py Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com> --- .../snippets/connect/clusters/update_connect_cluster.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/managedkafka/snippets/connect/clusters/update_connect_cluster.py b/managedkafka/snippets/connect/clusters/update_connect_cluster.py index 722136956e3..36ec1c5ec06 100644 --- a/managedkafka/snippets/connect/clusters/update_connect_cluster.py +++ b/managedkafka/snippets/connect/clusters/update_connect_cluster.py @@ -66,6 +66,6 @@ def update_connect_cluster( response = operation.result() print("Updated Connect cluster:", response) except GoogleAPICallError as e: - print(f"The operation failed with error: {e.message}") + print(f"The operation failed with error: {e}") # [END managedkafka_update_connect_cluster] From dfa07d2ac8ccb59e61878e154a4ba0d8f7f859b8 Mon Sep 17 00:00:00 2001 From: salmany Date: Thu, 24 Jul 2025 01:00:43 +0000 Subject: [PATCH 09/14] Add timeouts and improve error handling. --- .../snippets/connect/clusters/create_connect_cluster.py | 7 ++++--- .../snippets/connect/clusters/delete_connect_cluster.py | 3 ++- .../snippets/connect/clusters/list_connect_clusters.py | 9 ++++----- .../snippets/connect/clusters/update_connect_cluster.py | 2 ++ 4 files changed, 12 insertions(+), 9 deletions(-) diff --git a/managedkafka/snippets/connect/clusters/create_connect_cluster.py b/managedkafka/snippets/connect/clusters/create_connect_cluster.py index 004ed8610bd..6dfc218e684 100644 --- a/managedkafka/snippets/connect/clusters/create_connect_cluster.py +++ b/managedkafka/snippets/connect/clusters/create_connect_cluster.py @@ -75,10 +75,11 @@ def create_connect_cluster( try: operation = connect_client.create_connect_cluster(request=request) print(f"Waiting for operation {operation.operation.name} to complete...") - # Creating a Connect cluster can take 20-30 minutes. - response = operation.result(timeout=1800) + # Creating a Connect cluster can take 10-40 minutes. + response = operation.result(timeout=3000) print("Created Connect cluster:", response) except GoogleAPICallError as e: print(f"The operation failed with error: {e}") - # [END managedkafka_create_connect_cluster] \ No newline at end of file + # [END managedkafka_create_connect_cluster] + \ No newline at end of file diff --git a/managedkafka/snippets/connect/clusters/delete_connect_cluster.py b/managedkafka/snippets/connect/clusters/delete_connect_cluster.py index f05bc1fbdcf..b044fd2b047 100644 --- a/managedkafka/snippets/connect/clusters/delete_connect_cluster.py +++ b/managedkafka/snippets/connect/clusters/delete_connect_cluster.py @@ -51,7 +51,8 @@ def delete_connect_cluster( try: operation = client.delete_connect_cluster(request=request) print(f"Waiting for operation {operation.operation.name} to complete...") - operation.result() + # Deleting a Connect cluster can take 10-40 minutes. + operation.result(timeout=3000) print("Deleted Connect cluster") except GoogleAPICallError as e: print(f"The operation failed with error: {e}") diff --git a/managedkafka/snippets/connect/clusters/list_connect_clusters.py b/managedkafka/snippets/connect/clusters/list_connect_clusters.py index 615626017db..749a5267d91 100644 --- a/managedkafka/snippets/connect/clusters/list_connect_clusters.py +++ b/managedkafka/snippets/connect/clusters/list_connect_clusters.py @@ -29,6 +29,7 @@ def list_connect_clusters( from google.cloud.managedkafka_v1.services.managed_kafka_connect import ( ManagedKafkaConnectClient, ) + from google.api_core.exceptions import GoogleAPICallError # TODO(developer) # project_id = "my-project-id" @@ -42,11 +43,9 @@ def list_connect_clusters( response = connect_client.list_connect_clusters(request=request) for cluster in response: - try: - response = connect_client.list_connect_clusters(request=request) - for cluster in response: + try: print("Got Connect cluster:", cluster) - except GoogleAPICallError as e: - print(f"Failed to list Connect clusters with error: {e}") + except GoogleAPICallError as e: + print(f"Failed to list Connect clusters with error: {e}") # [END managedkafka_list_connect_clusters] diff --git a/managedkafka/snippets/connect/clusters/update_connect_cluster.py b/managedkafka/snippets/connect/clusters/update_connect_cluster.py index 36ec1c5ec06..f7ed0d0248e 100644 --- a/managedkafka/snippets/connect/clusters/update_connect_cluster.py +++ b/managedkafka/snippets/connect/clusters/update_connect_cluster.py @@ -63,6 +63,8 @@ def update_connect_cluster( try: operation = connect_client.update_connect_cluster(request=request) print(f"Waiting for operation {operation.operation.name} to complete...") + # Updating a Connect cluster can take 10-40 minutes. + operation.result(timeout=3000) response = operation.result() print("Updated Connect cluster:", response) except GoogleAPICallError as e: From ab697bfab4b1f26ff4ed5e0d9226fafe9800d889 Mon Sep 17 00:00:00 2001 From: salmany Date: Mon, 28 Jul 2025 19:44:31 +0000 Subject: [PATCH 10/14] Addressed PR comments. --- .../snippets/connect/clusters/clusters_test.py | 6 +++--- .../connect/clusters/create_connect_cluster.py | 10 +++++++++- .../connect/clusters/delete_connect_cluster.py | 6 ++---- 3 files changed, 14 insertions(+), 8 deletions(-) diff --git a/managedkafka/snippets/connect/clusters/clusters_test.py b/managedkafka/snippets/connect/clusters/clusters_test.py index 6d8c2b1c69e..26fea584393 100644 --- a/managedkafka/snippets/connect/clusters/clusters_test.py +++ b/managedkafka/snippets/connect/clusters/clusters_test.py @@ -38,8 +38,8 @@ def test_create_connect_cluster( mock_method: MagicMock, capsys: pytest.CaptureFixture[str], ) -> None: - cpu = 3 - memory_bytes = 3221225472 + cpu = 12 + memory_bytes = 12884901900 # 12 GB primary_subnet = "test-subnet" operation = mock.MagicMock(spec=Operation) connect_cluster = managedkafka_v1.types.ConnectCluster() @@ -101,7 +101,7 @@ def test_update_connect_cluster( mock_method: MagicMock, capsys: pytest.CaptureFixture[str], ) -> None: - new_memory_bytes = 3221225475 + new_memory_bytes = 12884901900 # 12 GB operation = mock.MagicMock(spec=Operation) connect_cluster = managedkafka_v1.types.ConnectCluster() connect_cluster.name = ( diff --git a/managedkafka/snippets/connect/clusters/create_connect_cluster.py b/managedkafka/snippets/connect/clusters/create_connect_cluster.py index 6dfc218e684..3187841e86a 100644 --- a/managedkafka/snippets/connect/clusters/create_connect_cluster.py +++ b/managedkafka/snippets/connect/clusters/create_connect_cluster.py @@ -65,6 +65,15 @@ def create_connect_cluster( connect_cluster.capacity_config.vcpu_count = cpu connect_cluster.capacity_config.memory_bytes = memory_bytes connect_cluster.gcp_config.access_config.network_configs = [ConnectNetworkConfig(primary_subnet=primary_subnet)] + # Optionally, you can also specify accessible subnets and resolvable DNS domains as part of your network configuration. + # For example: + # connect_cluster.gcp_config.access_config.network_configs = [ + # ConnectNetworkConfig( + # primary_subnet=primary_subnet, + # additional_subnets=additional_subnets, + # dns_domain_names=dns_domain_names, + # ) + # ] request = CreateConnectClusterRequest( parent=parent, @@ -82,4 +91,3 @@ def create_connect_cluster( print(f"The operation failed with error: {e}") # [END managedkafka_create_connect_cluster] - \ No newline at end of file diff --git a/managedkafka/snippets/connect/clusters/delete_connect_cluster.py b/managedkafka/snippets/connect/clusters/delete_connect_cluster.py index b044fd2b047..84258fe830f 100644 --- a/managedkafka/snippets/connect/clusters/delete_connect_cluster.py +++ b/managedkafka/snippets/connect/clusters/delete_connect_cluster.py @@ -27,8 +27,7 @@ def delete_connect_cluster( connect_cluster_id: ID of the Kafka Connect cluster. Raises: - This method will raise the GoogleAPICallError exception if the operation errors or - the timeout before the operation completes is reached. + This method will raise the GoogleAPICallError exception if the operation errors. """ # [START managedkafka_delete_connect_cluster] from google.api_core.exceptions import GoogleAPICallError @@ -51,8 +50,7 @@ def delete_connect_cluster( try: operation = client.delete_connect_cluster(request=request) print(f"Waiting for operation {operation.operation.name} to complete...") - # Deleting a Connect cluster can take 10-40 minutes. - operation.result(timeout=3000) + operation.result() print("Deleted Connect cluster") except GoogleAPICallError as e: print(f"The operation failed with error: {e}") From 333932509b10373dea0ae9d5746f1f662e0737ee Mon Sep 17 00:00:00 2001 From: salmany Date: Tue, 29 Jul 2025 01:00:38 +0000 Subject: [PATCH 11/14] Adds requirements.txt file for samples. As per the [Authoring Guide] (https://github.com/GoogleCloudPlatform/python-docs-samples/blob/main/AUTHORING_GUIDE.md#dependencies), each sample is required to have a requirements.txt file that lists the dependencies needed to run the sample. --- managedkafka/snippets/connect/clusters/requirements.txt | 6 ++++++ 1 file changed, 6 insertions(+) create mode 100644 managedkafka/snippets/connect/clusters/requirements.txt diff --git a/managedkafka/snippets/connect/clusters/requirements.txt b/managedkafka/snippets/connect/clusters/requirements.txt new file mode 100644 index 00000000000..5f372e81c41 --- /dev/null +++ b/managedkafka/snippets/connect/clusters/requirements.txt @@ -0,0 +1,6 @@ +protobuf==5.29.4 +pytest==8.2.2 +google-api-core==2.23.0 +google-auth==2.38.0 +google-cloud-managedkafka==0.1.12 +googleapis-common-protos==1.66.0 From f1bb355eda59552aeabdad103cbc4fa1acced91f Mon Sep 17 00:00:00 2001 From: salmany Date: Wed, 30 Jul 2025 22:01:52 +0000 Subject: [PATCH 12/14] Adds code examples for Managed Kafka Connect Connectors: * Creating BigQuery Sink Connector * Creating MirrorMaker Source Connector * Creating Pub/Sub Sink Connector * Creating Pub/Sub Source Connector * Creating Storage Sink Connector * Getting, updating, deleting, and listing connectors * Starting, stopping, pausing, restarting and resuming connectors --- .../connect/connectors/connectors_test.py | 421 ++++++++++++++++++ .../create_bigquery_sink_connector.py | 89 ++++ .../create_mirrormaker_connector.py | 99 ++++ .../create_pubsub_sink_connector.py | 95 ++++ .../create_pubsub_source_connector.py | 95 ++++ .../create_storage_sink_connector.py | 90 ++++ .../connect/connectors/delete_connector.py | 61 +++ .../connect/connectors/get_connector.py | 60 +++ .../connect/connectors/list_connectors.py | 54 +++ .../connect/connectors/pause_connector.py | 61 +++ .../connect/connectors/requirements.txt | 6 + .../connect/connectors/restart_connector.py | 61 +++ .../connect/connectors/resume_connector.py | 61 +++ .../connect/connectors/stop_connector.py | 61 +++ .../connect/connectors/update_connector.py | 79 ++++ 15 files changed, 1393 insertions(+) create mode 100644 managedkafka/snippets/connect/connectors/connectors_test.py create mode 100644 managedkafka/snippets/connect/connectors/create_bigquery_sink_connector.py create mode 100644 managedkafka/snippets/connect/connectors/create_mirrormaker_connector.py create mode 100644 managedkafka/snippets/connect/connectors/create_pubsub_sink_connector.py create mode 100644 managedkafka/snippets/connect/connectors/create_pubsub_source_connector.py create mode 100644 managedkafka/snippets/connect/connectors/create_storage_sink_connector.py create mode 100644 managedkafka/snippets/connect/connectors/delete_connector.py create mode 100644 managedkafka/snippets/connect/connectors/get_connector.py create mode 100644 managedkafka/snippets/connect/connectors/list_connectors.py create mode 100644 managedkafka/snippets/connect/connectors/pause_connector.py create mode 100644 managedkafka/snippets/connect/connectors/requirements.txt create mode 100644 managedkafka/snippets/connect/connectors/restart_connector.py create mode 100644 managedkafka/snippets/connect/connectors/resume_connector.py create mode 100644 managedkafka/snippets/connect/connectors/stop_connector.py create mode 100644 managedkafka/snippets/connect/connectors/update_connector.py diff --git a/managedkafka/snippets/connect/connectors/connectors_test.py b/managedkafka/snippets/connect/connectors/connectors_test.py new file mode 100644 index 00000000000..1f1cf8f13d3 --- /dev/null +++ b/managedkafka/snippets/connect/connectors/connectors_test.py @@ -0,0 +1,421 @@ +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from unittest import mock +from unittest.mock import MagicMock + +from google.api_core.operation import Operation +from google.cloud import managedkafka_v1 +import pytest + +import create_bigquery_sink_connector +import create_mirrormaker_connector +import create_pubsub_sink_connector +import create_pubsub_source_connector +import create_storage_sink_connector +import delete_connector +import get_connector +import list_connectors +import pause_connector +import restart_connector +import resume_connector +import stop_connector +import update_connector + +PROJECT_ID = "test-project-id" +REGION = "us-central1" +CONNECT_CLUSTER_ID = "test-connect-cluster-id" +CONNECTOR_ID = "test-connector-id" +KAFKA_TOPIC = "test-topic" + + +@mock.patch( + "google.cloud.managedkafka_v1.services.managed_kafka_connect.ManagedKafkaConnectClient.create_connector" +) +def test_create_mirrormaker_connector( + mock_method: MagicMock, + capsys: pytest.CaptureFixture[str], +) -> None: + connector_id = "test-mirrormaker" + source_cluster_dns = "source-cluster.example.com:9092" + target_cluster_dns = "target-cluster.example.com:9092" + operation = mock.MagicMock(spec=Operation) + connector = managedkafka_v1.types.Connector() + connector.name = ( + managedkafka_v1.ManagedKafkaConnectClient.connector_path( + PROJECT_ID, REGION, CONNECT_CLUSTER_ID, connector_id + ) + ) + operation.result = mock.MagicMock(return_value=connector) + mock_method.return_value = operation + + create_mirrormaker_connector.create_mirrormaker_connector( + project_id=PROJECT_ID, + region=REGION, + connect_cluster_id=CONNECT_CLUSTER_ID, + connector_id=connector_id, + source_cluster_dns=source_cluster_dns, + target_cluster_dns=target_cluster_dns, + topic_name=KAFKA_TOPIC, + ) + + out, _ = capsys.readouterr() + assert "Created MirrorMaker connector" in out + assert connector_id in out + mock_method.assert_called_once() + + +@mock.patch( + "google.cloud.managedkafka_v1.services.managed_kafka_connect.ManagedKafkaConnectClient.create_connector" +) +def test_create_pubsub_source_connector( + mock_method: MagicMock, + capsys: pytest.CaptureFixture[str], +) -> None: + connector_id = "test-pubsub-source" + subscription_id = "test-subscription" + operation = mock.MagicMock(spec=Operation) + connector = managedkafka_v1.types.Connector() + connector.name = ( + managedkafka_v1.ManagedKafkaConnectClient.connector_path( + PROJECT_ID, REGION, CONNECT_CLUSTER_ID, connector_id + ) + ) + operation.result = mock.MagicMock(return_value=connector) + mock_method.return_value = operation + + create_pubsub_source_connector.create_pubsub_source_connector( + project_id=PROJECT_ID, + region=REGION, + connect_cluster_id=CONNECT_CLUSTER_ID, + connector_id=connector_id, + kafka_topic=KAFKA_TOPIC, + subscription_id=subscription_id, + ) + + out, _ = capsys.readouterr() + assert "Created Pub/Sub source connector" in out + assert connector_id in out + mock_method.assert_called_once() + + +@mock.patch( + "google.cloud.managedkafka_v1.services.managed_kafka_connect.ManagedKafkaConnectClient.create_connector" +) +def test_create_pubsub_sink_connector( + mock_method: MagicMock, + capsys: pytest.CaptureFixture[str], +) -> None: + connector_id = "test-pubsub-sink" + pubsub_topic_id = "test-pubsub-topic" + operation = mock.MagicMock(spec=Operation) + connector = managedkafka_v1.types.Connector() + connector.name = ( + managedkafka_v1.ManagedKafkaConnectClient.connector_path( + PROJECT_ID, REGION, CONNECT_CLUSTER_ID, connector_id + ) + ) + operation.result = mock.MagicMock(return_value=connector) + mock_method.return_value = operation + + create_pubsub_sink_connector.create_pubsub_sink_connector( + project_id=PROJECT_ID, + region=REGION, + connect_cluster_id=CONNECT_CLUSTER_ID, + connector_id=connector_id, + kafka_topic=KAFKA_TOPIC, + pubsub_topic_id=pubsub_topic_id, + ) + + out, _ = capsys.readouterr() + assert "Created Pub/Sub sink connector" in out + assert connector_id in out + mock_method.assert_called_once() + + +@mock.patch( + "google.cloud.managedkafka_v1.services.managed_kafka_connect.ManagedKafkaConnectClient.create_connector" +) +def test_create_storage_sink_connector( + mock_method: MagicMock, + capsys: pytest.CaptureFixture[str], +) -> None: + connector_id = "test-gcs-sink" + bucket_name = "test-bucket" + operation = mock.MagicMock(spec=Operation) + connector = managedkafka_v1.types.Connector() + connector.name = ( + managedkafka_v1.ManagedKafkaConnectClient.connector_path( + PROJECT_ID, REGION, CONNECT_CLUSTER_ID, connector_id + ) + ) + operation.result = mock.MagicMock(return_value=connector) + mock_method.return_value = operation + + create_storage_sink_connector.create_storage_sink_connector( + project_id=PROJECT_ID, + region=REGION, + connect_cluster_id=CONNECT_CLUSTER_ID, + connector_id=connector_id, + kafka_topic=KAFKA_TOPIC, + bucket_name=bucket_name, + ) + + out, _ = capsys.readouterr() + assert "Created Cloud Storage sink connector" in out + assert connector_id in out + mock_method.assert_called_once() + + +@mock.patch( + "google.cloud.managedkafka_v1.services.managed_kafka_connect.ManagedKafkaConnectClient.create_connector" +) +def test_create_bigquery_sink_connector( + mock_method: MagicMock, + capsys: pytest.CaptureFixture[str], +) -> None: + connector_id = "test-bq-sink" + dataset_id = "test_dataset" + operation = mock.MagicMock(spec=Operation) + connector = managedkafka_v1.types.Connector() + connector.name = ( + managedkafka_v1.ManagedKafkaConnectClient.connector_path( + PROJECT_ID, REGION, CONNECT_CLUSTER_ID, connector_id + ) + ) + operation.result = mock.MagicMock(return_value=connector) + mock_method.return_value = operation + + create_bigquery_sink_connector.create_bigquery_sink_connector( + project_id=PROJECT_ID, + region=REGION, + connect_cluster_id=CONNECT_CLUSTER_ID, + connector_id=connector_id, + kafka_topic=KAFKA_TOPIC, + dataset_id=dataset_id, + ) + + out, _ = capsys.readouterr() + assert "Created BigQuery sink connector" in out + assert connector_id in out + mock_method.assert_called_once() + + +@mock.patch( + "google.cloud.managedkafka_v1.services.managed_kafka_connect.ManagedKafkaConnectClient.list_connectors" +) +def test_list_connectors( + mock_method: MagicMock, + capsys: pytest.CaptureFixture[str], +) -> None: + connector = managedkafka_v1.types.Connector() + connector.name = ( + managedkafka_v1.ManagedKafkaConnectClient.connector_path( + PROJECT_ID, REGION, CONNECT_CLUSTER_ID, CONNECTOR_ID + ) + ) + mock_method.return_value = [connector] + + list_connectors.list_connectors( + project_id=PROJECT_ID, + region=REGION, + connect_cluster_id=CONNECT_CLUSTER_ID, + ) + + out, _ = capsys.readouterr() + assert "Got connector" in out + assert CONNECTOR_ID in out + mock_method.assert_called_once() + + +@mock.patch( + "google.cloud.managedkafka_v1.services.managed_kafka_connect.ManagedKafkaConnectClient.get_connector" +) +def test_get_connector( + mock_method: MagicMock, + capsys: pytest.CaptureFixture[str], +) -> None: + connector = managedkafka_v1.types.Connector() + connector.name = ( + managedkafka_v1.ManagedKafkaConnectClient.connector_path( + PROJECT_ID, REGION, CONNECT_CLUSTER_ID, CONNECTOR_ID + ) + ) + mock_method.return_value = connector + + get_connector.get_connector( + project_id=PROJECT_ID, + region=REGION, + connect_cluster_id=CONNECT_CLUSTER_ID, + connector_id=CONNECTOR_ID, + ) + + out, _ = capsys.readouterr() + assert "Got connector" in out + assert CONNECTOR_ID in out + mock_method.assert_called_once() + + +@mock.patch( + "google.cloud.managedkafka_v1.services.managed_kafka_connect.ManagedKafkaConnectClient.update_connector" +) +def test_update_connector( + mock_method: MagicMock, + capsys: pytest.CaptureFixture[str], +) -> None: + configs = { + "tasks.max": "2", + "value.converter.schemas.enable" : "true" + } + operation = mock.MagicMock(spec=Operation) + connector = managedkafka_v1.types.Connector() + connector.name = ( + managedkafka_v1.ManagedKafkaConnectClient.connector_path( + PROJECT_ID, REGION, CONNECT_CLUSTER_ID, CONNECTOR_ID + ) + ) + operation.result = mock.MagicMock(return_value=connector) + mock_method.return_value = operation + + update_connector.update_connector( + project_id=PROJECT_ID, + region=REGION, + connect_cluster_id=CONNECT_CLUSTER_ID, + connector_id=CONNECTOR_ID, + configs=configs, + ) + + out, _ = capsys.readouterr() + assert "Updated connector" in out + assert CONNECTOR_ID in out + mock_method.assert_called_once() + + +@mock.patch( + "google.cloud.managedkafka_v1.services.managed_kafka_connect.ManagedKafkaConnectClient.delete_connector" +) +def test_delete_connector( + mock_method: MagicMock, + capsys: pytest.CaptureFixture[str], +) -> None: + operation = mock.MagicMock(spec=Operation) + operation.result = mock.MagicMock(return_value=None) + mock_method.return_value = operation + + delete_connector.delete_connector( + project_id=PROJECT_ID, + region=REGION, + connect_cluster_id=CONNECT_CLUSTER_ID, + connector_id=CONNECTOR_ID, + ) + + out, _ = capsys.readouterr() + assert "Deleted connector" in out + mock_method.assert_called_once() + + +@mock.patch( + "google.cloud.managedkafka_v1.services.managed_kafka_connect.ManagedKafkaConnectClient.pause_connector" +) +def test_pause_connector( + mock_method: MagicMock, + capsys: pytest.CaptureFixture[str], +) -> None: + operation = mock.MagicMock(spec=Operation) + operation.result = mock.MagicMock(return_value=None) + mock_method.return_value = operation + + pause_connector.pause_connector( + project_id=PROJECT_ID, + region=REGION, + connect_cluster_id=CONNECT_CLUSTER_ID, + connector_id=CONNECTOR_ID, + ) + + out, _ = capsys.readouterr() + assert "Paused connector" in out + assert CONNECTOR_ID in out + mock_method.assert_called_once() + + +@mock.patch( + "google.cloud.managedkafka_v1.services.managed_kafka_connect.ManagedKafkaConnectClient.resume_connector" +) +def test_resume_connector( + mock_method: MagicMock, + capsys: pytest.CaptureFixture[str], +) -> None: + operation = mock.MagicMock(spec=Operation) + operation.result = mock.MagicMock(return_value=None) + mock_method.return_value = operation + + resume_connector.resume_connector( + project_id=PROJECT_ID, + region=REGION, + connect_cluster_id=CONNECT_CLUSTER_ID, + connector_id=CONNECTOR_ID, + ) + + out, _ = capsys.readouterr() + assert "Resumed connector" in out + assert CONNECTOR_ID in out + mock_method.assert_called_once() + + +@mock.patch( + "google.cloud.managedkafka_v1.services.managed_kafka_connect.ManagedKafkaConnectClient.stop_connector" +) +def test_stop_connector( + mock_method: MagicMock, + capsys: pytest.CaptureFixture[str], +) -> None: + operation = mock.MagicMock(spec=Operation) + operation.result = mock.MagicMock(return_value=None) + mock_method.return_value = operation + + stop_connector.stop_connector( + project_id=PROJECT_ID, + region=REGION, + connect_cluster_id=CONNECT_CLUSTER_ID, + connector_id=CONNECTOR_ID, + ) + + out, _ = capsys.readouterr() + assert "Stopped connector" in out + assert CONNECTOR_ID in out + mock_method.assert_called_once() + + +@mock.patch( + "google.cloud.managedkafka_v1.services.managed_kafka_connect.ManagedKafkaConnectClient.restart_connector" +) +def test_restart_connector( + mock_method: MagicMock, + capsys: pytest.CaptureFixture[str], +) -> None: + operation = mock.MagicMock(spec=Operation) + operation.result = mock.MagicMock(return_value=None) + mock_method.return_value = operation + + restart_connector.restart_connector( + project_id=PROJECT_ID, + region=REGION, + connect_cluster_id=CONNECT_CLUSTER_ID, + connector_id=CONNECTOR_ID, + ) + + out, _ = capsys.readouterr() + assert "Restarted connector" in out + assert CONNECTOR_ID in out + mock_method.assert_called_once() diff --git a/managedkafka/snippets/connect/connectors/create_bigquery_sink_connector.py b/managedkafka/snippets/connect/connectors/create_bigquery_sink_connector.py new file mode 100644 index 00000000000..b25297b35b0 --- /dev/null +++ b/managedkafka/snippets/connect/connectors/create_bigquery_sink_connector.py @@ -0,0 +1,89 @@ +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +def create_bigquery_sink_connector( + project_id: str, + region: str, + connect_cluster_id: str, + connector_id: str, + kafka_topic: str, + dataset_id: str, +) -> None: + """ + Creates a BigQuery sink connector. + + Args: + project_id: Google Cloud project ID. + region: Cloud region. + connect_cluster_id: ID of the Kafka Connect cluster. + connector_id: ID for the new connector. + kafka_topic: Name of the Kafka topic to read messages from. + dataset_id: ID of the BigQuery dataset to write data to. + + Raises: + This method will raise the GoogleAPICallError exception if the operation errors. + """ + # [START managedkafka_create_bigquery_sink_connector] + from google.api_core.exceptions import GoogleAPICallError + from google.cloud import managedkafka_v1 + from google.cloud.managedkafka_v1.services.managed_kafka_connect import ( + ManagedKafkaConnectClient, + ) + from google.cloud.managedkafka_v1.types import Connector + + # TODO(developer) + # project_id = "my-project-id" + # region = "us-central1" + # connect_cluster_id = "my-connect-cluster" + # connector_id = "my-bigquery-sink" + # kafka_topic = "my-kafka-topic" + # dataset_id = "my_dataset" + + connect_client = ManagedKafkaConnectClient() + + # Here is a sample configuration for the BigQuery sink connector + configs = { + "name": connector_id, + "project": project_id, + "topics": kafka_topic, + "tasks.max": "3", + "connector.class": "com.wepay.kafka.connect.bigquery.BigQuerySinkConnector", + "key.converter": "org.apache.kafka.connect.storage.StringConverter", + "value.converter": "org.apache.kafka.connect.json.JsonConverter", + "value.converter.schemas.enable": "false", + "defaultDataset": dataset_id + } + + connector = Connector() + connector.name = connect_client.connector_path( + project_id, region, connect_cluster_id, connector_id + ) + connector.configs = configs + + request = managedkafka_v1.CreateConnectorRequest( + parent=connect_client.connect_cluster_path(project_id, region, connect_cluster_id), + connector_id=connector_id, + connector=connector, + ) + + try: + operation = connect_client.create_connector(request=request) + print(f"Waiting for operation {operation.operation.name} to complete...") + response = operation.result() + print("Created BigQuery sink connector:", response) + except GoogleAPICallError as e: + print(f"The operation failed with error: {e}") + + # [END managedkafka_create_bigquery_sink_connector] diff --git a/managedkafka/snippets/connect/connectors/create_mirrormaker_connector.py b/managedkafka/snippets/connect/connectors/create_mirrormaker_connector.py new file mode 100644 index 00000000000..b67732dbfad --- /dev/null +++ b/managedkafka/snippets/connect/connectors/create_mirrormaker_connector.py @@ -0,0 +1,99 @@ +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +def create_mirrormaker_connector( + project_id: str, + region: str, + connect_cluster_id: str, + connector_id: str, + source_cluster_dns: str, + target_cluster_dns: str, + topic_name: str, +) -> None: + """ + Creates a MirrorMaker 2.0 source connector. + + Args: + project_id: Google Cloud project ID. + region: Cloud region. + connect_cluster_id: ID of the Kafka Connect cluster. + connector_id: ID for the new connector. + source_cluster_dns: DNS name of the source cluster. + target_cluster_dns: DNS name of the target cluster. + topic_name: Name of the topic to mirror. + + Raises: + This method will raise the GoogleAPICallError exception if the operation errors. + """ + # [START managedkafka_create_mirrormaker_connector] + from google.api_core.exceptions import GoogleAPICallError + from google.cloud import managedkafka_v1 + from google.cloud.managedkafka_v1.services.managed_kafka_connect import ( + ManagedKafkaConnectClient, + ) + from google.cloud.managedkafka_v1.types import Connector + + # TODO(developer) + # project_id = "my-project-id" + # region = "us-central1" + # connect_cluster_id = "my-connect-cluster" + # connector_id = "my-mirrormaker-connector" + # source_cluster_dns = "source-cluster.c.my-project.internal:9092" + # target_cluster_dns = "target-cluster.c.my-project.internal:9092" + # topic_name = "my-topic" + + connect_client = ManagedKafkaConnectClient() + + # Here is a sample configuration for the MirrorMaker source connector + configs = { + "connector.class": "org.apache.kafka.connect.mirror.MirrorSourceConnector", + "name": connector_id, + "source.cluster.alias": "source", + "target.cluster.alias": "target", + "topics": topic_name, + "source.cluster.bootstrap.servers": source_cluster_dns, + "target.cluster.bootstrap.servers": target_cluster_dns, + "offset-syncs.topic.replication.factor": "1", + "source.cluster.security.protocol": "SASL_SSL", + "source.cluster.sasl.mechanism": "OAUTHBEARER", + "source.cluster.sasl.login.callback.handler.class": "com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler", + "source.cluster.sasl.jaas.config": "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;", + "target.cluster.security.protocol": "SASL_SSL", + "target.cluster.sasl.mechanism": "OAUTHBEARER", + "target.cluster.sasl.login.callback.handler.class": "com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler", + "target.cluster.sasl.jaas.config": "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;" + } + + connector = Connector() + connector.name = connect_client.connector_path( + project_id, region, connect_cluster_id, connector_id + ) + connector.configs = configs + + request = managedkafka_v1.CreateConnectorRequest( + parent=connect_client.connect_cluster_path(project_id, region, connect_cluster_id), + connector_id=connector_id, + connector=connector, + ) + + try: + operation = connect_client.create_connector(request=request) + print(f"Waiting for operation {operation.operation.name} to complete...") + response = operation.result() + print("Created MirrorMaker connector:", response) + except GoogleAPICallError as e: + print(f"The operation failed with error: {e}") + + # [END managedkafka_create_mirrormaker_connector] diff --git a/managedkafka/snippets/connect/connectors/create_pubsub_sink_connector.py b/managedkafka/snippets/connect/connectors/create_pubsub_sink_connector.py new file mode 100644 index 00000000000..7163df818db --- /dev/null +++ b/managedkafka/snippets/connect/connectors/create_pubsub_sink_connector.py @@ -0,0 +1,95 @@ +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +def create_pubsub_sink_connector( + project_id: str, + region: str, + connect_cluster_id: str, + connector_id: str, + kafka_topic: str, + pubsub_topic_id: str, + target_project_id: str = None, +) -> None: + """ + Creates a Pub/Sub sink connector. + + Args: + project_id: Google Cloud project ID where the connector will be created. + region: Cloud region. + connect_cluster_id: ID of the Kafka Connect cluster. + connector_id: ID for the new connector. + kafka_topic: Name of the Kafka topic to read messages from. + pubsub_topic_id: ID of the Pub/Sub topic to write messages to. + target_project_id: Project ID containing the Pub/Sub topic. If not provided, + uses the same project as the connector. + + Raises: + This method will raise the GoogleAPICallError exception if the operation errors. + """ + # [START managedkafka_create_pubsub_sink_connector] + from google.api_core.exceptions import GoogleAPICallError + from google.cloud import managedkafka_v1 + from google.cloud.managedkafka_v1.services.managed_kafka_connect import ( + ManagedKafkaConnectClient, + ) + from google.cloud.managedkafka_v1.types import Connector + + # TODO(developer) + # project_id = "my-project-id" + # region = "us-central1" + # connect_cluster_id = "my-connect-cluster" + # connector_id = "my-pubsub-sink" + # kafka_topic = "my-kafka-topic" + # pubsub_topic_id = "my-pubsub-topic" + # target_project_id = "my-target-project" # Optional + + if target_project_id is None: + target_project_id = project_id + + connect_client = ManagedKafkaConnectClient() + + # Here is a sample configuration for the Pub/Sub sink connector + configs = { + "connector.class": "com.google.pubsub.kafka.sink.CloudPubSubSinkConnector", + "name": connector_id, + "tasks.max": "1", + "topics": kafka_topic, + "value.converter": "org.apache.kafka.connect.storage.StringConverter", + "key.converter": "org.apache.kafka.connect.storage.StringConverter", + "cps.topic": pubsub_topic_id, + "cps.project": target_project_id + } + + connector = Connector() + connector.name = connect_client.connector_path( + project_id, region, connect_cluster_id, connector_id + ) + connector.configs = configs + + request = managedkafka_v1.CreateConnectorRequest( + parent=connect_client.connect_cluster_path(project_id, region, connect_cluster_id), + connector_id=connector_id, + connector=connector, + ) + + try: + operation = connect_client.create_connector(request=request) + print(f"Waiting for operation {operation.operation.name} to complete...") + response = operation.result() + print("Created Pub/Sub sink connector:", response) + except GoogleAPICallError as e: + print(f"The operation failed with error: {e}") + + # [END managedkafka_create_pubsub_sink_connector] diff --git a/managedkafka/snippets/connect/connectors/create_pubsub_source_connector.py b/managedkafka/snippets/connect/connectors/create_pubsub_source_connector.py new file mode 100644 index 00000000000..23f5a75bd77 --- /dev/null +++ b/managedkafka/snippets/connect/connectors/create_pubsub_source_connector.py @@ -0,0 +1,95 @@ +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +def create_pubsub_source_connector( + project_id: str, + region: str, + connect_cluster_id: str, + connector_id: str, + kafka_topic: str, + subscription_id: str, + source_project_id: str = None, +) -> None: + """ + Creates a Pub/Sub source connector. + + Args: + project_id: Google Cloud project ID where the connector will be created. + region: Cloud region. + connect_cluster_id: ID of the Kafka Connect cluster. + connector_id: ID for the new connector. + kafka_topic: Name of the Kafka topic to write messages to. + subscription_id: ID of the Pub/Sub subscription to read messages from. + source_project_id: Project ID containing the Pub/Sub subscription. If not provided, + uses the same project as the connector. + + Raises: + This method will raise the GoogleAPICallError exception if the operation errors. + """ + # [START managedkafka_create_pubsub_source_connector] + from google.api_core.exceptions import GoogleAPICallError + from google.cloud import managedkafka_v1 + from google.cloud.managedkafka_v1.services.managed_kafka_connect import ( + ManagedKafkaConnectClient, + ) + from google.cloud.managedkafka_v1.types import Connector + + # TODO(developer) + # project_id = "my-project-id" + # region = "us-central1" + # connect_cluster_id = "my-connect-cluster" + # connector_id = "my-pubsub-source" + # kafka_topic = "my-kafka-topic" + # subscription_id = "my-pubsub-subscription" + # source_project_id = "my-source-project" # Optional + + if source_project_id is None: + source_project_id = project_id + + connect_client = ManagedKafkaConnectClient() + + # Here is a sample configuration for the Pub/Sub source connector + configs = { + "connector.class": "com.google.pubsub.kafka.source.CloudPubSubSourceConnector", + "name": connector_id, + "tasks.max": "1", + "kafka.topic": kafka_topic, + "cps.subscription": subscription_id, + "cps.project": source_project_id, + "value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter", + "key.converter": "org.apache.kafka.connect.storage.StringConverter" + } + + connector = Connector() + connector.name = connect_client.connector_path( + project_id, region, connect_cluster_id, connector_id + ) + connector.configs = configs + + request = managedkafka_v1.CreateConnectorRequest( + parent=connect_client.connect_cluster_path(project_id, region, connect_cluster_id), + connector_id=connector_id, + connector=connector, + ) + + try: + operation = connect_client.create_connector(request=request) + print(f"Waiting for operation {operation.operation.name} to complete...") + response = operation.result() + print("Created Pub/Sub source connector:", response) + except GoogleAPICallError as e: + print(f"The operation failed with error: {e}") + + # [END managedkafka_create_pubsub_source_connector] diff --git a/managedkafka/snippets/connect/connectors/create_storage_sink_connector.py b/managedkafka/snippets/connect/connectors/create_storage_sink_connector.py new file mode 100644 index 00000000000..ae6831399fa --- /dev/null +++ b/managedkafka/snippets/connect/connectors/create_storage_sink_connector.py @@ -0,0 +1,90 @@ +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +def create_storage_sink_connector( + project_id: str, + region: str, + connect_cluster_id: str, + connector_id: str, + kafka_topic: str, + bucket_name: str, +) -> None: + """ + Creates a Cloud Storage sink connector. + + Args: + project_id: Google Cloud project ID. + region: Cloud region. + connect_cluster_id: ID of the Kafka Connect cluster. + connector_id: ID for the new connector. + kafka_topic: Name of the Kafka topic to read messages from. + bucket_name: Name of the Cloud Storage bucket to write files to. + + Raises: + This method will raise the GoogleAPICallError exception if the operation errors. + """ + # [START managedkafka_create_storage_sink_connector] + from google.api_core.exceptions import GoogleAPICallError + from google.cloud import managedkafka_v1 + from google.cloud.managedkafka_v1.services.managed_kafka_connect import ( + ManagedKafkaConnectClient, + ) + from google.cloud.managedkafka_v1.types import Connector + + # TODO(developer) + # project_id = "my-project-id" + # region = "us-central1" + # connect_cluster_id = "my-connect-cluster" + # connector_id = "my-gcs-sink" + # kafka_topic = "my-kafka-topic" + # bucket_name = "my-gcs-bucket" + + connect_client = ManagedKafkaConnectClient() + + # Here is a sample configuration for the Cloud Storage sink connector + configs = { + "connector.class": "io.aiven.kafka.connect.gcs.GcsSinkConnector", + "tasks.max": "1", + "topics": kafka_topic, + "gcs.bucket.name": bucket_name, + "gcs.credentials.default": "true", + "format.output.type": "json", + "name": connector_id, + "value.converter": "org.apache.kafka.connect.json.JsonConverter", + "value.converter.schemas.enable": "false", + "key.converter": "org.apache.kafka.connect.storage.StringConverter" + } + + connector = Connector() + connector.name = connect_client.connector_path( + project_id, region, connect_cluster_id, connector_id + ) + connector.configs = configs + + request = managedkafka_v1.CreateConnectorRequest( + parent=connect_client.connect_cluster_path(project_id, region, connect_cluster_id), + connector_id=connector_id, + connector=connector, + ) + + try: + operation = connect_client.create_connector(request=request) + print(f"Waiting for operation {operation.operation.name} to complete...") + response = operation.result() + print("Created Cloud Storage sink connector:", response) + except GoogleAPICallError as e: + print(f"The operation failed with error: {e}") + + # [END managedkafka_create_storage_sink_connector] diff --git a/managedkafka/snippets/connect/connectors/delete_connector.py b/managedkafka/snippets/connect/connectors/delete_connector.py new file mode 100644 index 00000000000..933c07922ac --- /dev/null +++ b/managedkafka/snippets/connect/connectors/delete_connector.py @@ -0,0 +1,61 @@ +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +def delete_connector( + project_id: str, + region: str, + connect_cluster_id: str, + connector_id: str, +) -> None: + """ + Delete a connector. + + Args: + project_id: Google Cloud project ID. + region: Cloud region. + connect_cluster_id: ID of the Kafka Connect cluster. + connector_id: ID of the connector. + + Raises: + This method will raise the GoogleAPICallError exception if the operation errors. + """ + # [START managedkafka_delete_connector] + from google.api_core.exceptions import GoogleAPICallError + from google.cloud.managedkafka_v1.services.managed_kafka_connect import ( + ManagedKafkaConnectClient, + ) + from google.cloud import managedkafka_v1 + + # TODO(developer) + # project_id = "my-project-id" + # region = "us-central1" + # connect_cluster_id = "my-connect-cluster" + # connector_id = "my-connector" + + client = ManagedKafkaConnectClient() + + request = managedkafka_v1.DeleteConnectorRequest( + name=client.connector_path(project_id, region, connect_cluster_id, connector_id), + ) + + try: + operation = client.delete_connector(request=request) + print(f"Waiting for operation {operation.operation.name} to complete...") + operation.result() + print("Deleted connector") + except GoogleAPICallError as e: + print(f"The operation failed with error: {e}") + + # [END managedkafka_delete_connector] diff --git a/managedkafka/snippets/connect/connectors/get_connector.py b/managedkafka/snippets/connect/connectors/get_connector.py new file mode 100644 index 00000000000..c4dec256e3f --- /dev/null +++ b/managedkafka/snippets/connect/connectors/get_connector.py @@ -0,0 +1,60 @@ +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +def get_connector( + project_id: str, + region: str, + connect_cluster_id: str, + connector_id: str, +) -> None: + """ + Get details of a specific connector. + + Args: + project_id: Google Cloud project ID. + region: Cloud region. + connect_cluster_id: ID of the Kafka Connect cluster. + connector_id: ID of the connector. + + Raises: + This method will raise the NotFound exception if the connector is not found. + """ + # [START managedkafka_get_connector] + from google.api_core.exceptions import NotFound + from google.cloud.managedkafka_v1.services.managed_kafka_connect import ManagedKafkaConnectClient + from google.cloud import managedkafka_v1 + + # TODO(developer) + # project_id = "my-project-id" + # region = "us-central1" + # connect_cluster_id = "my-connect-cluster" + # connector_id = "my-connector" + + client = ManagedKafkaConnectClient() + + connector_path = client.connector_path( + project_id, region, connect_cluster_id, connector_id + ) + request = managedkafka_v1.GetConnectorRequest( + name=connector_path, + ) + + try: + connector = client.get_connector(request=request) + print("Got connector:", connector) + except NotFound as e: + print(f"Failed to get connector {connector_id} with error: {e}") + + # [END managedkafka_get_connector] diff --git a/managedkafka/snippets/connect/connectors/list_connectors.py b/managedkafka/snippets/connect/connectors/list_connectors.py new file mode 100644 index 00000000000..f707df09454 --- /dev/null +++ b/managedkafka/snippets/connect/connectors/list_connectors.py @@ -0,0 +1,54 @@ +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +def list_connectors( + project_id: str, + region: str, + connect_cluster_id: str, +) -> None: + """ + List all connectors in a Kafka Connect cluster. + + Args: + project_id: Google Cloud project ID. + region: Cloud region. + connect_cluster_id: ID of the Kafka Connect cluster. + """ + # [START managedkafka_list_connectors] + from google.cloud import managedkafka_v1 + from google.cloud.managedkafka_v1.services.managed_kafka_connect import ( + ManagedKafkaConnectClient, + ) + from google.api_core.exceptions import GoogleAPICallError + + # TODO(developer) + # project_id = "my-project-id" + # region = "us-central1" + # connect_cluster_id = "my-connect-cluster" + + connect_client = ManagedKafkaConnectClient() + + request = managedkafka_v1.ListConnectorsRequest( + parent=connect_client.connect_cluster_path(project_id, region, connect_cluster_id), + ) + + try: + response = connect_client.list_connectors(request=request) + for connector in response: + print("Got connector:", connector) + except GoogleAPICallError as e: + print(f"Failed to list connectors with error: {e}") + + # [END managedkafka_list_connectors] diff --git a/managedkafka/snippets/connect/connectors/pause_connector.py b/managedkafka/snippets/connect/connectors/pause_connector.py new file mode 100644 index 00000000000..e1d9a5a016a --- /dev/null +++ b/managedkafka/snippets/connect/connectors/pause_connector.py @@ -0,0 +1,61 @@ +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +def pause_connector( + project_id: str, + region: str, + connect_cluster_id: str, + connector_id: str, +) -> None: + """ + Pause a connector. + + Args: + project_id: Google Cloud project ID. + region: Cloud region. + connect_cluster_id: ID of the Kafka Connect cluster. + connector_id: ID of the connector. + + Raises: + This method will raise the GoogleAPICallError exception if the operation errors. + """ + # [START managedkafka_pause_connector] + from google.api_core.exceptions import GoogleAPICallError + from google.cloud.managedkafka_v1.services.managed_kafka_connect import ( + ManagedKafkaConnectClient, + ) + from google.cloud import managedkafka_v1 + + # TODO(developer) + # project_id = "my-project-id" + # region = "us-central1" + # connect_cluster_id = "my-connect-cluster" + # connector_id = "my-connector" + + client = ManagedKafkaConnectClient() + + request = managedkafka_v1.PauseConnectorRequest( + name=client.connector_path(project_id, region, connect_cluster_id, connector_id), + ) + + try: + operation = client.pause_connector(request=request) + print(f"Waiting for operation {operation.operation.name} to complete...") + operation.result() + print(f"Paused connector {connector_id}") + except GoogleAPICallError as e: + print(f"Failed to pause connector {connector_id} with error: {e}") + + # [END managedkafka_pause_connector] diff --git a/managedkafka/snippets/connect/connectors/requirements.txt b/managedkafka/snippets/connect/connectors/requirements.txt new file mode 100644 index 00000000000..5f372e81c41 --- /dev/null +++ b/managedkafka/snippets/connect/connectors/requirements.txt @@ -0,0 +1,6 @@ +protobuf==5.29.4 +pytest==8.2.2 +google-api-core==2.23.0 +google-auth==2.38.0 +google-cloud-managedkafka==0.1.12 +googleapis-common-protos==1.66.0 diff --git a/managedkafka/snippets/connect/connectors/restart_connector.py b/managedkafka/snippets/connect/connectors/restart_connector.py new file mode 100644 index 00000000000..5c713ba9b41 --- /dev/null +++ b/managedkafka/snippets/connect/connectors/restart_connector.py @@ -0,0 +1,61 @@ +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +def restart_connector( + project_id: str, + region: str, + connect_cluster_id: str, + connector_id: str, +) -> None: + """ + Restart a connector. + + Args: + project_id: Google Cloud project ID. + region: Cloud region. + connect_cluster_id: ID of the Kafka Connect cluster. + connector_id: ID of the connector. + + Raises: + This method will raise the GoogleAPICallError exception if the operation errors. + """ + # [START managedkafka_restart_connector] + from google.api_core.exceptions import GoogleAPICallError + from google.cloud.managedkafka_v1.services.managed_kafka_connect import ( + ManagedKafkaConnectClient, + ) + from google.cloud import managedkafka_v1 + + # TODO(developer) + # project_id = "my-project-id" + # region = "us-central1" + # connect_cluster_id = "my-connect-cluster" + # connector_id = "my-connector" + + client = ManagedKafkaConnectClient() + + request = managedkafka_v1.RestartConnectorRequest( + name=client.connector_path(project_id, region, connect_cluster_id, connector_id), + ) + + try: + operation = client.restart_connector(request=request) + print(f"Waiting for operation {operation.operation.name} to complete...") + operation.result() + print(f"Restarted connector {connector_id}") + except GoogleAPICallError as e: + print(f"Failed to restart connector {connector_id} with error: {e}") + + # [END managedkafka_restart_connector] diff --git a/managedkafka/snippets/connect/connectors/resume_connector.py b/managedkafka/snippets/connect/connectors/resume_connector.py new file mode 100644 index 00000000000..1f415240252 --- /dev/null +++ b/managedkafka/snippets/connect/connectors/resume_connector.py @@ -0,0 +1,61 @@ +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +def resume_connector( + project_id: str, + region: str, + connect_cluster_id: str, + connector_id: str, +) -> None: + """ + Resume a paused connector. + + Args: + project_id: Google Cloud project ID. + region: Cloud region. + connect_cluster_id: ID of the Kafka Connect cluster. + connector_id: ID of the connector. + + Raises: + This method will raise the GoogleAPICallError exception if the operation errors. + """ + # [START managedkafka_resume_connector] + from google.api_core.exceptions import GoogleAPICallError + from google.cloud.managedkafka_v1.services.managed_kafka_connect import ( + ManagedKafkaConnectClient, + ) + from google.cloud import managedkafka_v1 + + # TODO(developer) + # project_id = "my-project-id" + # region = "us-central1" + # connect_cluster_id = "my-connect-cluster" + # connector_id = "my-connector" + + client = ManagedKafkaConnectClient() + + request = managedkafka_v1.ResumeConnectorRequest( + name=client.connector_path(project_id, region, connect_cluster_id, connector_id), + ) + + try: + operation = client.resume_connector(request=request) + print(f"Waiting for operation {operation.operation.name} to complete...") + operation.result() + print(f"Resumed connector {connector_id}") + except GoogleAPICallError as e: + print(f"Failed to resume connector {connector_id} with error: {e}") + + # [END managedkafka_resume_connector] diff --git a/managedkafka/snippets/connect/connectors/stop_connector.py b/managedkafka/snippets/connect/connectors/stop_connector.py new file mode 100644 index 00000000000..eff636c79db --- /dev/null +++ b/managedkafka/snippets/connect/connectors/stop_connector.py @@ -0,0 +1,61 @@ +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +def stop_connector( + project_id: str, + region: str, + connect_cluster_id: str, + connector_id: str, +) -> None: + """ + Stop a connector. + + Args: + project_id: Google Cloud project ID. + region: Cloud region. + connect_cluster_id: ID of the Kafka Connect cluster. + connector_id: ID of the connector. + + Raises: + This method will raise the GoogleAPICallError exception if the operation errors. + """ + # [START managedkafka_stop_connector] + from google.api_core.exceptions import GoogleAPICallError + from google.cloud.managedkafka_v1.services.managed_kafka_connect import ( + ManagedKafkaConnectClient, + ) + from google.cloud import managedkafka_v1 + + # TODO(developer) + # project_id = "my-project-id" + # region = "us-central1" + # connect_cluster_id = "my-connect-cluster" + # connector_id = "my-connector" + + client = ManagedKafkaConnectClient() + + request = managedkafka_v1.StopConnectorRequest( + name=client.connector_path(project_id, region, connect_cluster_id, connector_id), + ) + + try: + operation = client.stop_connector(request=request) + print(f"Waiting for operation {operation.operation.name} to complete...") + operation.result() + print(f"Stopped connector {connector_id}") + except GoogleAPICallError as e: + print(f"Failed to stop connector {connector_id} with error: {e}") + + # [END managedkafka_stop_connector] diff --git a/managedkafka/snippets/connect/connectors/update_connector.py b/managedkafka/snippets/connect/connectors/update_connector.py new file mode 100644 index 00000000000..7581151b18b --- /dev/null +++ b/managedkafka/snippets/connect/connectors/update_connector.py @@ -0,0 +1,79 @@ +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +def update_connector( + project_id: str, + region: str, + connect_cluster_id: str, + connector_id: str, + configs: dict, +) -> None: + """ + Update a connector's configuration. + + Args: + project_id: Google Cloud project ID. + region: Cloud region. + connect_cluster_id: ID of the Kafka Connect cluster. + connector_id: ID of the connector. + config: Dictionary containing the updated configuration. + + Raises: + This method will raise the GoogleAPICallError exception if the operation errors. + """ + # [START managedkafka_update_connector] + from google.api_core.exceptions import GoogleAPICallError + from google.cloud import managedkafka_v1 + from google.cloud.managedkafka_v1.services.managed_kafka_connect import ( + ManagedKafkaConnectClient, + ) + from google.cloud.managedkafka_v1.types import Connector + from google.protobuf import field_mask_pb2 + + # TODO(developer) + # project_id = "my-project-id" + # region = "us-central1" + # connect_cluster_id = "my-connect-cluster" + # connector_id = "my-connector" + # configs = { + # "tasks.max": "2", + # "value.converter.schemas.enable" : "true" + # } + + connect_client = ManagedKafkaConnectClient() + + connector = Connector() + connector.name = connect_client.connector_path( + project_id, region, connect_cluster_id, connector_id + ) + connector.configs = configs + update_mask = field_mask_pb2.FieldMask() + update_mask.paths.append("config") + + # For a list of editable fields, one can check https://cloud.google.com/managed-service-for-apache-kafka/docs/connect-cluster/update-connector#editable-properties. + request = managedkafka_v1.UpdateConnectorRequest( + update_mask=update_mask, + connector=connector, + ) + + try: + operation = connect_client.update_connector(request=request) + print(f"Waiting for operation {operation.operation.name} to complete...") + response = operation.result() + print("Updated connector:", response) + except GoogleAPICallError as e: + print(f"The operation failed with error: {e}") + + # [END managedkafka_update_connector] From 8084f5b1bf562fd22fcbcc21f65323a5d4039f9d Mon Sep 17 00:00:00 2001 From: salmany Date: Thu, 31 Jul 2025 20:00:31 +0000 Subject: [PATCH 13/14] Minor fixes. --- .../snippets/connect/clusters/list_connect_clusters.py | 10 +++++----- .../connect/clusters/update_connect_cluster.py | 2 -- .../snippets/connect/connectors/connectors_test.py | 2 +- .../connect/connectors/create_connectors_test.py | 0 .../snippets/connect/connectors/update_connector.py | 4 ++-- 5 files changed, 8 insertions(+), 10 deletions(-) create mode 100644 managedkafka/snippets/connect/connectors/create_connectors_test.py diff --git a/managedkafka/snippets/connect/clusters/list_connect_clusters.py b/managedkafka/snippets/connect/clusters/list_connect_clusters.py index 749a5267d91..00c9dc8585a 100644 --- a/managedkafka/snippets/connect/clusters/list_connect_clusters.py +++ b/managedkafka/snippets/connect/clusters/list_connect_clusters.py @@ -41,11 +41,11 @@ def list_connect_clusters( parent=connect_client.common_location_path(project_id, region), ) - response = connect_client.list_connect_clusters(request=request) - for cluster in response: - try: + try: + response = connect_client.list_connect_clusters(request=request) + for cluster in response: print("Got Connect cluster:", cluster) - except GoogleAPICallError as e: - print(f"Failed to list Connect clusters with error: {e}") + except GoogleAPICallError as e: + print(f"Failed to list Connect clusters with error: {e}") # [END managedkafka_list_connect_clusters] diff --git a/managedkafka/snippets/connect/clusters/update_connect_cluster.py b/managedkafka/snippets/connect/clusters/update_connect_cluster.py index f7ed0d0248e..36ec1c5ec06 100644 --- a/managedkafka/snippets/connect/clusters/update_connect_cluster.py +++ b/managedkafka/snippets/connect/clusters/update_connect_cluster.py @@ -63,8 +63,6 @@ def update_connect_cluster( try: operation = connect_client.update_connect_cluster(request=request) print(f"Waiting for operation {operation.operation.name} to complete...") - # Updating a Connect cluster can take 10-40 minutes. - operation.result(timeout=3000) response = operation.result() print("Updated Connect cluster:", response) except GoogleAPICallError as e: diff --git a/managedkafka/snippets/connect/connectors/connectors_test.py b/managedkafka/snippets/connect/connectors/connectors_test.py index 1f1cf8f13d3..d9d6e3df585 100644 --- a/managedkafka/snippets/connect/connectors/connectors_test.py +++ b/managedkafka/snippets/connect/connectors/connectors_test.py @@ -276,7 +276,7 @@ def test_update_connector( ) -> None: configs = { "tasks.max": "2", - "value.converter.schemas.enable" : "true" + "value.converter.schemas.enable": "true" } operation = mock.MagicMock(spec=Operation) connector = managedkafka_v1.types.Connector() diff --git a/managedkafka/snippets/connect/connectors/create_connectors_test.py b/managedkafka/snippets/connect/connectors/create_connectors_test.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/managedkafka/snippets/connect/connectors/update_connector.py b/managedkafka/snippets/connect/connectors/update_connector.py index 7581151b18b..1f678154a97 100644 --- a/managedkafka/snippets/connect/connectors/update_connector.py +++ b/managedkafka/snippets/connect/connectors/update_connector.py @@ -28,7 +28,7 @@ def update_connector( region: Cloud region. connect_cluster_id: ID of the Kafka Connect cluster. connector_id: ID of the connector. - config: Dictionary containing the updated configuration. + configs: Dictionary containing the updated configuration. Raises: This method will raise the GoogleAPICallError exception if the operation errors. @@ -49,7 +49,7 @@ def update_connector( # connector_id = "my-connector" # configs = { # "tasks.max": "2", - # "value.converter.schemas.enable" : "true" + # "value.converter.schemas.enable": "true" # } connect_client = ManagedKafkaConnectClient() From 328ab0c51823d8772cfcaccf2c2a14e186d6818b Mon Sep 17 00:00:00 2001 From: salmany Date: Thu, 31 Jul 2025 20:03:42 +0000 Subject: [PATCH 14/14] Remove redundant file. --- .../snippets/connect/connectors/create_connectors_test.py | 0 1 file changed, 0 insertions(+), 0 deletions(-) delete mode 100644 managedkafka/snippets/connect/connectors/create_connectors_test.py diff --git a/managedkafka/snippets/connect/connectors/create_connectors_test.py b/managedkafka/snippets/connect/connectors/create_connectors_test.py deleted file mode 100644 index e69de29bb2d..00000000000