-
Notifications
You must be signed in to change notification settings - Fork 0
PNP Workflow - Set authorize flag enhancement #99
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
PNP Workflow - Set authorize flag enhancement #99
Conversation
…talyst-center-ansible-dev into pnp_workflow_enhancement
…talyst-center-ansible-dev into pnp_workflow_enhancement
…talyst-center-ansible-dev into pnp_workflow_enhancement
…talyst-center-ansible-dev into pnp_workflow_enhancement
|
||
Returns: | ||
dict: The API response if the authorization is successful. | ||
None: If the authorization fails or an unexpected response is received. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Description:
This function authorizes a PnP device by setting the authorization flag, which moves the device
from "Pending Authorization" state to "Authorized" state. This is required for devices to be
provisioned after being claimed to a site. The function is supported from Cisco Catalyst Center
release version 2.3.7.6 onwards and handles both successful and failed authorization scenarios.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated the docstring also updated 2.3.7.9
dict: The API response if the authorization is successful. | ||
None: If the authorization fails or an unexpected response is received. | ||
""" | ||
self.log("Authorizing the device with device ID '{0}'.".format(device_id), "DEBUG") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
self.log("Initiating device authorization process for device ID: '{0}'".format(device_id), "DEBUG")
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
updated the log
op_modifies=True | ||
) | ||
self.log( | ||
"Response from 'authorize_device' API for device authorization: {0}".format( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"Received API response from 'authorize_device' for device ID '{0}': {1}".format(
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
updated the log
"DEBUG", | ||
) | ||
|
||
if authorize_response and isinstance(authorize_response, dict): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
self.log("Device authorization completed successfully for device ID: '{0}'".format(device_id), "INFO")
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
added the log
if authorize_response and isinstance(authorize_response, dict): | ||
return authorize_response | ||
|
||
self.log("Received unexpected response from 'authorize_device' API for device ID {0}". |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
self.log(
"Received unexpected response format from 'authorize_device' API for device ID '{0}' - expected dict, got: {1}".format(
device_id, type(authorize_response).__name__
),
"ERROR"
)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
updated the log
device_info = device.get("deviceInfo", {}) | ||
serial_number = device_info.get("serialNumber") | ||
|
||
for each_config in self.config: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can split this into three steps... Can you check and update?
- Identify devices that need authorization based on config
- Process authorization for devices that require it
- Generate final status summary
# First, identify devices that need authorization based on config
for device in processed_devices:
device_info = device.get("deviceInfo", {})
serial_number = device_info.get("serialNumber")
if not serial_number:
self.log("Device missing serial number - skipping authorization check: {0}".format(device), "WARNING")
continue
self.log("Checking authorization requirements for device: '{0}'".format(serial_number), "DEBUG")
# Check if this device has authorize flag set in config
authorization_required = False
for each_config in self.config:
input_device_info = each_config.get("device_info", [])
for each_info in input_device_info:
if (each_info.get("serialNumber") == serial_number and
each_info.get("authorize") is True):
authorization_required = True
self.log("Device '{0}' requires authorization based on config".format(serial_number), "DEBUG")
break
if authorization_required:
break
if authorization_required:
devices_requiring_auth.append(serial_number)
else:
self.log("Device '{0}' does not require authorization (authorize flag not set)".format(serial_number), "DEBUG")
if not devices_requiring_auth:
self.log("No devices require authorization based on configuration", "INFO")
return True, []
self.log("Found {0} device(s) requiring authorization: {1}".format(
len(devices_requiring_auth), devices_requiring_auth), "INFO")
# Process authorization for devices that require it
for serial_number in devices_requiring_auth:
self.log("Processing authorization for device: '{0}'".format(serial_number), "DEBUG")
device_response = self.get_device_list_pnp(serial_number)
if not device_response or not isinstance(device_response, dict):
self.log("Unable to retrieve device details for serial number: '{0}' - skipping authorization".format(
serial_number), "WARNING")
unauthorized_devices.append(serial_number)
continue
device_info = device_response.get("deviceInfo", {})
current_state = device_info.get("state")
device_id = device_response.get("id")
self.log("Device '{0}' current state: '{1}'".format(serial_number, current_state), "DEBUG")
if current_state != "Pending Authorization":
self.log("Device '{0}' is not in 'Pending Authorization' state (current: '{1}') - skipping authorization".format(
serial_number, current_state), "INFO")
unauthorized_devices.append(serial_number)
continue
if not device_id:
self.log("Device '{0}' missing device ID - cannot authorize".format(serial_number), "ERROR")
unauthorized_devices.append(serial_number)
continue
# Attempt device authorization
self.log("Attempting to authorize device '{0}' with ID '{1}'".format(serial_number, device_id), "INFO")
authorize_response = self.authorize_device(device_id)
self.log("Authorization response for device '{0}': {1}".format(
serial_number, self.pprint(authorize_response)), "DEBUG")
if authorize_response and isinstance(authorize_response, dict):
self.log("Device '{0}' authorized successfully".format(serial_number), "INFO")
authorized_devices.append(serial_number)
else:
error_msg = str(authorize_response) if authorize_response else "No response received"
self.log("Failed to authorize device '{0}': {1}".format(serial_number, error_msg), "ERROR")
unauthorized_devices.append(serial_number)
# Generate final status summary
total_auth_required = len(devices_requiring_auth)
auth_success_count = len(authorized_devices)
auth_failed_count = len(unauthorized_devices)
self.log("Bulk authorization completed - Required: {0}, Successful: {1}, Failed: {2}".format(
total_auth_required, auth_success_count, auth_failed_count), "INFO")
if authorized_devices:
self.log("Successfully authorized devices: {0}".format(authorized_devices), "INFO")
if unauthorized_devices:
self.log("Failed to authorize devices: {0}".format(unauthorized_devices), "WARNING")
# Return success status and appropriate device list
if authorized_devices and not unauthorized_devices:
self.log("All devices requiring authorization were successfully authorized", "INFO")
return True, authorized_devices
if unauthorized_devices:
self.log("Some devices failed authorization or were not eligible", "WARNING")
return False, unauthorized_devices
# This should not be reached, but included for completeness
self.log("No authorization operations were performed", "INFO")
return True, []
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Replaced the code.
) | ||
self.update_device_info( | ||
existing_device_info, device_info, device_response.get("id") | ||
) | ||
|
||
if authorize_flag and self.compare_dnac_versions(self.get_ccc_version(), "2.3.7.6") > 0 \ | ||
and claim_stat == "Pending Authorization": |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
self.log("Initiating device authorization process for device '{0}' - Version: {1}, State: {2}".format(
serial_number, current_version, claim_stat), "INFO")
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
updated the log
|
||
if authorize_flag and self.compare_dnac_versions(self.get_ccc_version(), "2.3.7.6") > 0 \ | ||
and claim_stat == "Pending Authorization": | ||
authorize_response = self.authorize_device(device_response.get("id")) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what happens if we failed to get device id?
device_id = device_response.get("id")
if not device_id:
self.log("Device ID not found for device '{0}' - cannot proceed with authorization".format(
serial_number), "ERROR")
else:
authorize_response = self.authorize_device(device_id)
self.log("Authorization API response for device '{0}': {1}".format(
serial_number, self.pprint(authorize_response)), "DEBUG")
if authorize_response and isinstance(authorize_response, dict):
self.log("Device '{0}' authorized successfully and moved from 'Pending Authorization' state".format(
serial_number), "INFO")
else:
error_msg = str(authorize_response) if authorize_response else "No response received"
self.log("Failed to authorize device '{0}': {1}".format(
serial_number, error_msg), "ERROR")
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
updated the code.
@@ -1544,6 +1699,23 @@ class instance for further use. | |||
|
|||
if self.have["deviceInfo"]: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
save it in local variables and reuse it.. For example self.want.get("serial_number")) used in many places..
self.result["msg"] = "Only Device Added Successfully"
self.log("Device successfully added to PnP database", "INFO")
# Check if device requires authorization based on state and version compatibility
device_state = self.have["deviceInfo"].get("state")
current_version = self.get_ccc_version()
device_id = dev_add_response.get("id")
serial_number = self.want.get("serial_number")
self.log("Device '{0}' current state: '{1}', Catalyst Center version: '{2}'".format(
serial_number, device_state, current_version), "DEBUG")
# Check authorization requirements
if (device_state == "Pending Authorization" and
self.compare_dnac_versions(current_version, "2.3.7.6") >= 0):
self.log("Device '{0}' is in 'Pending Authorization' state and version supports authorization - proceeding with authorization".format(
serial_number), "INFO")
if not device_id:
self.log("Device ID not found for device '{0}' - cannot proceed with authorization".format(
serial_number), "ERROR")
self.result["msg"] += ". Unable to authorize Device '{0}' - missing device ID.".format(serial_number)
else:
self.log("Initiating authorization process for device '{0}' with ID '{1}'".format(
serial_number, device_id), "DEBUG")
authorize_response = self.authorize_device(device_id)
self.log("Authorization API response for device '{0}': {1}".format(
serial_number, self.pprint(authorize_response)), "DEBUG")
if authorize_response and isinstance(authorize_response, dict):
self.log("Device '{0}' authorization completed successfully".format(serial_number), "INFO")
self.result["msg"] += ". Device '{0}' authorized successfully.".format(serial_number)
else:
error_msg = str(authorize_response) if authorize_response else "No response received"
self.log("Failed to authorize device '{0}': {1}".format(serial_number, error_msg), "ERROR")
self.result["msg"] += ". Unable to authorize Device '{0}' - {1}.".format(serial_number, error_msg)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated the same with 2.3.7.9
@@ -1571,6 +1743,19 @@ class instance for further use. | |||
claim_params = self.get_claim_params() | |||
claim_params["deviceId"] = dev_add_response.get("id") | |||
|
|||
if self.have["deviceInfo"].get("state") == "Pending Authorization" \ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
# Check if device requires authorization based on state and version compatibility
device_state = self.have["deviceInfo"].get("state")
current_version = self.get_ccc_version()
device_id = dev_add_response.get("id")
serial_number = self.want.get("serial_number")
self.log("Device addition completed - checking authorization requirements for device '{0}'".format(serial_number), "DEBUG")
self.log("Device '{0}' current state: '{1}', Catalyst Center version: '{2}'".format(
serial_number, device_state, current_version), "DEBUG")
# Process device authorization if conditions are met
if (device_state == "Pending Authorization" and
self.compare_dnac_versions(current_version, "2.3.7.6") >= 0):
self.log("Device '{0}' is in 'Pending Authorization' state and version supports authorization - initiating authorization process".format(
serial_number), "INFO")
if not device_id:
self.log("Device ID not found for device '{0}' - cannot proceed with authorization".format(
serial_number), "ERROR")
self.result["msg"] += ". Unable to authorize Device '{0}' - missing device ID.".format(serial_number)
else:
self.log("Attempting device authorization for device '{0}' with ID '{1}'".format(
serial_number, device_id), "DEBUG")
authorize_response = self.authorize_device(device_id)
self.log("Authorization API response for device '{0}': {1}".format(
serial_number, self.pprint(authorize_response)), "DEBUG")
if authorize_response and isinstance(authorize_response, dict):
self.log("Device '{0}' authorization completed successfully and moved from 'Pending Authorization' state".format(
serial_number), "INFO")
self.result["msg"] += ". Device '{0}' authorized successfully.".format(serial_number)
else:
error_msg = str(authorize_response) if authorize_response else "No response received"
self.log("Failed to authorize device '{0}': {1}".format(serial_number, error_msg), "ERROR")
self.result["msg"] += ". Unable to authorize Device '{0}' - {1}.".format(serial_number, error_msg)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
updated the code.
@@ -92,6 +92,12 @@ | |||
flag. | |||
type: bool | |||
required: false | |||
authorize: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please write detailed documentation on what we are doing. Check with @DNACENSolutions and get approval for the documentation.
authorize:
description:
- Set the authorization flag for PnP devices to enable provisioning after claiming.
- When set to true, devices in "Pending Authorization" state will be automatically authorized.
- This flag moves devices from "Pending Authorization" to "Authorized" state, allowing them to proceed with the provisioning workflow.
- Authorization is performed after successful device import (bulk operations) or device addition (single device operations).
- If not specified, devices will remain in their current authorization state and may require manual authorization.
- This parameter only applies to devices that support the authorization workflow in their PnP process.
- Authorization is skipped for devices that are not in "Pending Authorization" state.
- Supported from Cisco Catalyst Center release version 2.3.7.6 onwards.
type: bool
required: false
default: false
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
updated the same. also changed from 2.3.7.9 onward, not available on 2.3.7.6
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Addressed code review comments
@@ -92,6 +92,12 @@ | |||
flag. | |||
type: bool | |||
required: false | |||
authorize: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
updated the same. also changed from 2.3.7.9 onward, not available on 2.3.7.6
|
||
Returns: | ||
dict: The API response if the authorization is successful. | ||
None: If the authorization fails or an unexpected response is received. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated the docstring also updated 2.3.7.9
dict: The API response if the authorization is successful. | ||
None: If the authorization fails or an unexpected response is received. | ||
""" | ||
self.log("Authorizing the device with device ID '{0}'.".format(device_id), "DEBUG") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
updated the log
op_modifies=True | ||
) | ||
self.log( | ||
"Response from 'authorize_device' API for device authorization: {0}".format( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
updated the log
"DEBUG", | ||
) | ||
|
||
if authorize_response and isinstance(authorize_response, dict): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
added the log
|
||
if authorize_flag and self.compare_dnac_versions(self.get_ccc_version(), "2.3.7.6") > 0 \ | ||
and claim_stat == "Pending Authorization": | ||
authorize_response = self.authorize_device(device_response.get("id")) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
updated the code.
self.result['diff'] = self.validated_config | ||
self.result['changed'] = True | ||
|
||
if self.compare_dnac_versions(self.get_ccc_version(), "2.3.7.6") > 0: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated the same code.
device_info = device.get("deviceInfo", {}) | ||
serial_number = device_info.get("serialNumber") | ||
|
||
for each_config in self.config: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Replaced the code.
@@ -1544,6 +1699,23 @@ class instance for further use. | |||
|
|||
if self.have["deviceInfo"]: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated the same with 2.3.7.9
@@ -1571,6 +1743,19 @@ class instance for further use. | |||
claim_params = self.get_claim_params() | |||
claim_params["deviceId"] = dev_add_response.get("id") | |||
|
|||
if self.have["deviceInfo"].get("state") == "Pending Authorization" \ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
updated the code.
16e32c1
into
cisco-en-programmability:main
Type of Change
Description
As per the requirement, the device must be set to an authorized status by enabling the authorization flag in the playbook. For this implementation, the authorization should be set for devices that are in the "Pending Authorization" state.
Enhancement:
Set the authorization flag for devices.
Enhancement Description:
Previously, devices were not authorized after being claimed. With this enhancement, devices can be provisioned once authorized.
Impact Area:
This change affects the device add and claim process, as well as scenarios where devices are only claimed or have already been claimed.
Testing Done:
Test cases covered: [Mention test case IDs or brief points]
Checklist
Ansible Best Practices
ansible-vault
or environment variables)Documentation
Screenshots (if applicable)
Notes to Reviewers