diff --git a/netbox_acls/api/serializers.py b/netbox_acls/api/serializers.py index 39cd253a..56102748 100644 --- a/netbox_acls/api/serializers.py +++ b/netbox_acls/api/serializers.py @@ -5,13 +5,16 @@ from django.contrib.contenttypes.models import ContentType from drf_spectacular.utils import extend_schema_field -from ipam.api.serializers import PrefixSerializer from netbox.api.fields import ContentTypeField from netbox.api.serializers import NetBoxModelSerializer from rest_framework import serializers from utilities.api import get_serializer_for_model -from ..constants import ACL_HOST_ASSIGNMENT_MODELS, ACL_INTERFACE_ASSIGNMENT_MODELS +from ..constants import ( + ACL_HOST_ASSIGNMENT_MODELS, + ACL_INTERFACE_ASSIGNMENT_MODELS, + ACL_RULE_SOURCE_DESTINATION_MODELS, +) from ..models import ( AccessList, ACLExtendedRule, @@ -28,8 +31,8 @@ # Sets a standard error message for ACL rules with an action of remark, but no remark set. error_message_no_remark = "Action is set to remark, you MUST add a remark." -# Sets a standard error message for ACL rules with an action of remark, but no source_prefix is set. -error_message_action_remark_source_prefix_set = "Action is set to remark, Source Prefix CANNOT be set." +# Sets a standard error message for ACL rules with an action of remark, but no source is set. +error_message_action_remark_source_set = "Action is set to remark, Source CANNOT be set." # Sets a standard error message for ACL rules with an action not set to remark, but no remark is set. error_message_remark_without_action_remark = "CANNOT set remark unless action is set to remark." # Sets a standard error message for ACL rules no associated with an ACL of the same type. @@ -186,12 +189,18 @@ class ACLStandardRuleSerializer(NetBoxModelSerializer): view_name="plugins-api:netbox_acls-api:aclstandardrule-detail", ) access_list = AccessListSerializer(nested=True, required=True) - source_prefix = PrefixSerializer( - nested=True, + source_type = ContentTypeField( + queryset=ContentType.objects.filter(ACL_RULE_SOURCE_DESTINATION_MODELS), required=False, + default=None, allow_null=True, + ) + source_id = serializers.IntegerField( + required=False, default=None, + allow_null=True, ) + source = serializers.SerializerMethodField(read_only=True) class Meta: """ @@ -207,20 +216,36 @@ class Meta: "index", "action", "remark", - "source_prefix", + "source_type", + "source_id", + "source", "description", "tags", "created", "custom_fields", "last_updated", ) - brief_fields = ("id", "url", "display", "access_list", "index") + brief_fields = ( + "id", + "url", + "display", + "access_list", + "index", + ) + + @extend_schema_field(serializers.JSONField(allow_null=True)) + def get_source(self, obj): + if obj.source_id is None: + return None + serializer = get_serializer_for_model(obj.source) + context = {"request": self.context["request"]} + return serializer(obj.source, nested=True, context=context).data def validate(self, data): """ Validate the ACLStandardRule django model's inputs before allowing it to update the instance: - Check if action set to remark, but no remark set. - - Check if action set to remark, but source_prefix set. + - Check if action set to remark, but source set. """ error_message = {} @@ -230,10 +255,10 @@ def validate(self, data): error_message["remark"] = [ error_message_no_remark, ] - # Check if action set to remark, but source_prefix set. - if data.get("source_prefix"): - error_message["source_prefix"] = [ - error_message_action_remark_source_prefix_set, + # Check if action set to remark, but the source set. + if data.get("source"): + error_message["source"] = [ + error_message_action_remark_source_set, ] if error_message: @@ -251,18 +276,30 @@ class ACLExtendedRuleSerializer(NetBoxModelSerializer): view_name="plugins-api:netbox_acls-api:aclextendedrule-detail", ) access_list = AccessListSerializer(nested=True, required=True) - source_prefix = PrefixSerializer( - nested=True, + source_type = ContentTypeField( + queryset=ContentType.objects.filter(ACL_RULE_SOURCE_DESTINATION_MODELS), required=False, + default=None, allow_null=True, + ) + source_id = serializers.IntegerField( + required=False, default=None, + allow_null=True, ) - destination_prefix = PrefixSerializer( - nested=True, + source = serializers.SerializerMethodField(read_only=True) + destination_type = ContentTypeField( + queryset=ContentType.objects.filter(ACL_RULE_SOURCE_DESTINATION_MODELS), required=False, + default=None, allow_null=True, + ) + destination_id = serializers.IntegerField( + required=False, default=None, + allow_null=True, ) + destination = serializers.SerializerMethodField(read_only=True) class Meta: """ @@ -279,9 +316,13 @@ class Meta: "action", "remark", "protocol", - "source_prefix", + "source_type", + "source_id", + "source", "source_ports", - "destination_prefix", + "destination_type", + "destination_id", + "destination", "destination_ports", "description", "tags", @@ -289,15 +330,37 @@ class Meta: "custom_fields", "last_updated", ) - brief_fields = ("id", "url", "display", "access_list", "index") + brief_fields = ( + "id", + "url", + "display", + "access_list", + "index", + ) + + @extend_schema_field(serializers.JSONField(allow_null=True)) + def get_source(self, obj): + if obj.source_id is None: + return None + serializer = get_serializer_for_model(obj.source) + context = {"request": self.context["request"]} + return serializer(obj.source, nested=True, context=context).data + + @extend_schema_field(serializers.JSONField(allow_null=True)) + def get_destination(self, obj): + if obj.destination_id is None: + return None + serializer = get_serializer_for_model(obj.destination) + context = {"request": self.context["request"]} + return serializer(obj.destination, nested=True, context=context).data def validate(self, data): """ Validate the ACLExtendedRule django model's inputs before allowing it to update the instance: - Check if action set to remark, but no remark set. - - Check if action set to remark, but source_prefix set. + - Check if action set to remark, but source set. - Check if action set to remark, but source_ports set. - - Check if action set to remark, but destination_prefix set. + - Check if action set to remark, but destination set. - Check if action set to remark, but destination_ports set. - Check if action set to remark, but protocol set. - Check if action set to remark, but protocol set. @@ -310,19 +373,19 @@ def validate(self, data): error_message["remark"] = [ error_message_no_remark, ] - # Check if action set to remark, but source_prefix set. - if data.get("source_prefix"): - error_message["source_prefix"] = [ - error_message_action_remark_source_prefix_set, + # Check if action set to remark, but the source set. + if data.get("source"): + error_message["source"] = [ + error_message_action_remark_source_set, ] # Check if action set to remark, but source_ports set. if data.get("source_ports"): error_message["source_ports"] = [ "Action is set to remark, Source Ports CANNOT be set.", ] - # Check if action set to remark, but destination_prefix set. - if data.get("destination_prefix"): - error_message["destination_prefix"] = [ + # Check if action set to remark, but destination set. + if data.get("destination"): + error_message["destination"] = [ "Action is set to remark, Destination Prefix CANNOT be set.", ] # Check if action set to remark, but destination_ports set. diff --git a/netbox_acls/api/views.py b/netbox_acls/api/views.py index cdff48d1..5c7c17b2 100644 --- a/netbox_acls/api/views.py +++ b/netbox_acls/api/views.py @@ -25,7 +25,7 @@ class AccessListViewSet(NetBoxModelViewSet): """ - Defines the view set for the django AccessList model & associates it to a view. + Defines the view set for the django AccessList model and associates it with a view. """ queryset = ( @@ -41,7 +41,7 @@ class AccessListViewSet(NetBoxModelViewSet): class ACLInterfaceAssignmentViewSet(NetBoxModelViewSet): """ - Defines the view set for the django ACLInterfaceAssignment model & associates it to a view. + Defines the view set for the django ACLInterfaceAssignment model and associates it with a view. """ queryset = models.ACLInterfaceAssignment.objects.prefetch_related( @@ -54,13 +54,13 @@ class ACLInterfaceAssignmentViewSet(NetBoxModelViewSet): class ACLStandardRuleViewSet(NetBoxModelViewSet): """ - Defines the view set for the django ACLStandardRule model & associates it to a view. + Defines the view set for the django ACLStandardRule model and associates it with a view. """ queryset = models.ACLStandardRule.objects.prefetch_related( "access_list", + "source", "tags", - "source_prefix", ) serializer_class = ACLStandardRuleSerializer filterset_class = filtersets.ACLStandardRuleFilterSet @@ -68,14 +68,14 @@ class ACLStandardRuleViewSet(NetBoxModelViewSet): class ACLExtendedRuleViewSet(NetBoxModelViewSet): """ - Defines the view set for the django ACLExtendedRule model & associates it to a view. + Defines the view set for the django ACLExtendedRule model and associates it with a view. """ queryset = models.ACLExtendedRule.objects.prefetch_related( "access_list", + "source", + "destination", "tags", - "source_prefix", - "destination_prefix", ) serializer_class = ACLExtendedRuleSerializer filterset_class = filtersets.ACLExtendedRuleFilterSet diff --git a/netbox_acls/choices.py b/netbox_acls/choices.py index 4b1c5b2a..19793589 100644 --- a/netbox_acls/choices.py +++ b/netbox_acls/choices.py @@ -16,7 +16,7 @@ class ACLActionChoices(ChoiceSet): """ - Defines the choices availble for the Access Lists plugin specific to ACL default_action. + Defines the choices available for the Access Lists plugin specific to ACL default_action. """ ACTION_DENY = "deny" @@ -32,7 +32,7 @@ class ACLActionChoices(ChoiceSet): class ACLRuleActionChoices(ChoiceSet): """ - Defines the choices availble for the Access Lists plugin specific to ACL rule actions. + Defines the choices available for the Access Lists plugin specific to ACL rule actions. """ ACTION_DENY = "deny" @@ -62,7 +62,7 @@ class ACLAssignmentDirectionChoices(ChoiceSet): class ACLTypeChoices(ChoiceSet): """ - Defines the choices availble for the Access Lists plugin specific to ACL type. + Defines the choices available for the Access Lists plugin specific to ACL type. """ TYPE_STANDARD = "standard" @@ -76,7 +76,7 @@ class ACLTypeChoices(ChoiceSet): class ACLProtocolChoices(ChoiceSet): """ - Defines the choices availble for the Access Lists plugin specific to ACL Rule protocol. + Defines the choices available for the Access Lists plugin specific to ACL Rule protocol. """ PROTOCOL_ICMP = "icmp" diff --git a/netbox_acls/constants.py b/netbox_acls/constants.py index 468845c2..07b9ea00 100644 --- a/netbox_acls/constants.py +++ b/netbox_acls/constants.py @@ -1,14 +1,39 @@ """ Constants for filters """ + from django.db.models import Q +# +# AccessList +# + ACL_HOST_ASSIGNMENT_MODELS = Q( Q(app_label="dcim", model="device") | Q(app_label="dcim", model="virtualchassis") | Q(app_label="virtualization", model="virtualmachine"), ) +# +# ACLInterfaceAssignment +# + ACL_INTERFACE_ASSIGNMENT_MODELS = Q( Q(app_label="dcim", model="interface") | Q(app_label="virtualization", model="vminterface"), ) + +# +# AccessList Rule +# + +ACL_RULE_SOURCE_DESTINATION_MODELS = Q( + Q( + app_label="ipam", + model__in=( + "aggregate", + "ipaddress", + "iprange", + "prefix", + ), + ) +) diff --git a/netbox_acls/filtersets.py b/netbox_acls/filtersets.py index 45b860dc..e90bcab2 100644 --- a/netbox_acls/filtersets.py +++ b/netbox_acls/filtersets.py @@ -7,8 +7,9 @@ from dcim.models import Device, Interface, Region, Site, SiteGroup, VirtualChassis from django.db.models import Q from django.utils.translation import gettext_lazy as _ -from ipam.models import Prefix +from ipam.models import Aggregate, IPAddress, IPRange, Prefix from netbox.filtersets import NetBoxModelFilterSet +from utilities.filters import ContentTypeFilter, NumericArrayFilter from virtualization.models import VirtualMachine, VMInterface from .choices import ACLTypeChoices @@ -203,14 +204,53 @@ class ACLStandardRuleFilterSet(NetBoxModelFilterSet): ) # Source + source_type = ContentTypeFilter( + label=_("Source Type"), + ) + source_aggregate = django_filters.ModelMultipleChoiceFilter( + field_name="_source_aggregate__prefix", + queryset=Aggregate.objects.all(), + to_field_name="prefix", + label=_("Source Aggregate (name)"), + ) + source_aggregate_id = django_filters.ModelMultipleChoiceFilter( + field_name="_source_aggregate", + queryset=Aggregate.objects.all(), + to_field_name="id", + label=_("Source Aggregate (ID)"), + ) + source_ipaddress = django_filters.ModelMultipleChoiceFilter( + field_name="_source_ipaddress__address", + queryset=IPAddress.objects.all(), + to_field_name="address", + label=_("Source IP-Address (name)"), + ) + source_ipaddress_id = django_filters.ModelMultipleChoiceFilter( + field_name="_source_ipaddress", + queryset=IPAddress.objects.all(), + to_field_name="id", + label=_("Source IP-Address (ID)"), + ) + source_iprange = django_filters.ModelMultipleChoiceFilter( + field_name="_source_iprange__start_address", + queryset=IPRange.objects.all(), + to_field_name="start_address", + label=_("Source IP-Range (name)"), + ) + source_iprange_id = django_filters.ModelMultipleChoiceFilter( + field_name="_source_iprange", + queryset=IPRange.objects.all(), + to_field_name="id", + label=_("Source IP-Range (ID)"), + ) source_prefix = django_filters.ModelMultipleChoiceFilter( - field_name="source_prefix", + field_name="_source_prefix__prefix", queryset=Prefix.objects.all(), - to_field_name="name", + to_field_name="prefix", label=_("Source Prefix (name)"), ) source_prefix_id = django_filters.ModelMultipleChoiceFilter( - field_name="source_prefix", + field_name="_source_prefix", queryset=Prefix.objects.all(), to_field_name="id", label=_("Source Prefix (ID)"), @@ -222,17 +262,21 @@ class Meta: """ model = ACLStandardRule - fields = ("id", "access_list", "index", "action") + fields = ( + "id", + "access_list", + "index", + "action", + "remark", + "source_type", + "source_id", + ) def search(self, queryset, name, value): """ Override the default search behavior for the django model. """ - query = ( - Q(access_list__name__icontains=value) - | Q(index__icontains=value) - | Q(action__icontains=value) - ) + query = Q(access_list__name__icontains=value) | Q(index__icontains=value) | Q(action__icontains=value) return queryset.filter(query) @@ -254,32 +298,120 @@ class ACLExtendedRuleFilterSet(NetBoxModelFilterSet): ) # Source + source_type = ContentTypeFilter( + label=_("Source Type"), + ) + source_aggregate = django_filters.ModelMultipleChoiceFilter( + field_name="_source_aggregate__prefix", + queryset=Aggregate.objects.all(), + to_field_name="prefix", + label=_("Source Aggregate (name)"), + ) + source_aggregate_id = django_filters.ModelMultipleChoiceFilter( + field_name="_source_aggregate", + queryset=Aggregate.objects.all(), + to_field_name="id", + label=_("Source Aggregate (ID)"), + ) + source_ipaddress = django_filters.ModelMultipleChoiceFilter( + field_name="_source_ipaddress__address", + queryset=IPAddress.objects.all(), + to_field_name="address", + label=_("Source IP-Address (name)"), + ) + source_ipaddress_id = django_filters.ModelMultipleChoiceFilter( + field_name="_source_ipaddress", + queryset=IPAddress.objects.all(), + to_field_name="id", + label=_("Source IP-Address (ID)"), + ) + source_iprange = django_filters.ModelMultipleChoiceFilter( + field_name="_source_iprange__start_address", + queryset=IPRange.objects.all(), + to_field_name="start_address", + label=_("Source IP-Range (name)"), + ) + source_iprange_id = django_filters.ModelMultipleChoiceFilter( + field_name="_source_iprange", + queryset=IPRange.objects.all(), + to_field_name="id", + label=_("Source IP-Range (ID)"), + ) source_prefix = django_filters.ModelMultipleChoiceFilter( - field_name="source_prefix", + field_name="_source_prefix__prefix", queryset=Prefix.objects.all(), - to_field_name="name", + to_field_name="prefix", label=_("Source Prefix (name)"), ) source_prefix_id = django_filters.ModelMultipleChoiceFilter( - field_name="source_prefix", + field_name="_source_prefix", queryset=Prefix.objects.all(), to_field_name="id", label=_("Source Prefix (ID)"), ) + source_port = NumericArrayFilter( + field_name="source_ports", + lookup_expr="contains", + label=_("Source Port"), + ) # Destination + destination_type = ContentTypeFilter( + label=_("Destination Type"), + ) + destination_aggregate = django_filters.ModelMultipleChoiceFilter( + field_name="_destination_aggregate__prefix", + queryset=Aggregate.objects.all(), + to_field_name="prefix", + label=_("Destination Aggregate (name)"), + ) + destination_aggregate_id = django_filters.ModelMultipleChoiceFilter( + field_name="_destination_aggregate", + queryset=Aggregate.objects.all(), + to_field_name="id", + label=_("Destination Aggregate (ID)"), + ) + destination_ipaddress = django_filters.ModelMultipleChoiceFilter( + field_name="_destination_ipaddress__address", + queryset=IPAddress.objects.all(), + to_field_name="address", + label=_("Destination IP-Address (name)"), + ) + destination_ipaddress_id = django_filters.ModelMultipleChoiceFilter( + field_name="_destination_ipaddress", + queryset=IPAddress.objects.all(), + to_field_name="id", + label=_("Destination IP-Address (ID)"), + ) + destination_iprange = django_filters.ModelMultipleChoiceFilter( + field_name="_destination_iprange__start_address", + queryset=IPRange.objects.all(), + to_field_name="start_address", + label=_("Destination IP-Range (name)"), + ) + destination_iprange_id = django_filters.ModelMultipleChoiceFilter( + field_name="_destination_iprange", + queryset=IPRange.objects.all(), + to_field_name="id", + label=_("Destination IP-Range (ID)"), + ) destination_prefix = django_filters.ModelMultipleChoiceFilter( - field_name="destination_prefix", + field_name="_destination_prefix__prefix", queryset=Prefix.objects.all(), - to_field_name="name", + to_field_name="prefix", label=_("Destination Prefix (name)"), ) destination_prefix_id = django_filters.ModelMultipleChoiceFilter( - field_name="destination_prefix", + field_name="_destination_prefix", queryset=Prefix.objects.all(), to_field_name="id", label=_("Destination Prefix (ID)"), ) + destination_port = NumericArrayFilter( + field_name="destination_ports", + lookup_expr="contains", + label=_("Destination Port"), + ) class Meta: """ @@ -287,7 +419,20 @@ class Meta: """ model = ACLExtendedRule - fields = ("id", "access_list", "index", "action", "protocol") + fields = ( + "id", + "access_list", + "index", + "action", + "remark", + "source_type", + "source_id", + "source_port", + "destination_type", + "destination_id", + "destination_port", + "protocol", + ) def search(self, queryset, name, value): """ diff --git a/netbox_acls/forms/filtersets.py b/netbox_acls/forms/filtersets.py index 6998144a..c1392f97 100644 --- a/netbox_acls/forms/filtersets.py +++ b/netbox_acls/forms/filtersets.py @@ -5,7 +5,7 @@ from dcim.models import Device, Interface, Region, Site, SiteGroup, VirtualChassis from django import forms from django.utils.translation import gettext_lazy as _ -from ipam.models import Prefix +from ipam.models import Aggregate, IPAddress, IPRange, Prefix from netbox.forms import NetBoxModelFilterSetForm from utilities.forms.fields import ( DynamicModelChoiceField, @@ -45,17 +45,38 @@ class AccessListFilterForm(NetBoxModelFilterSetForm): model = AccessList fieldsets = ( - FieldSet("q", "tag", name=None), - FieldSet("type", "default_action", name=_("ACL Details")), - FieldSet("region_id", "site_group_id", "site_id", "device_id", name=_("Device Details")), - FieldSet("virtual_chassis_id", name=_("Virtual Chassis Details")), - FieldSet("virtual_machine_id", name=_("Virtual Machine Details")), + FieldSet( + "q", + "tag", + name=None, + ), + FieldSet( + "type", + "default_action", + name=_("ACL Details"), + ), + FieldSet( + "region_id", + "site_group_id", + "site_id", + "device_id", + name=_("Device Details"), + ), + FieldSet( + "virtual_chassis_id", + name=_("Virtual Chassis Details"), + ), + FieldSet( + "virtual_machine_id", + name=_("Virtual Machine Details"), + ), ) - # ACL + # ACL selector type = forms.ChoiceField( choices=add_blank_choice(ACLTypeChoices), required=False, + label=_("Type"), ) default_action = forms.ChoiceField( choices=add_blank_choice(ACLActionChoices), @@ -119,10 +140,29 @@ class ACLInterfaceAssignmentFilterForm(NetBoxModelFilterSetForm): model = ACLInterfaceAssignment fieldsets = ( - FieldSet("q", "tag", name=None), - FieldSet("access_list_id", "direction", name=_("ACL Details")), - FieldSet("region_id", "site_group_id", "site_id", "device_id", "interface_id", name=_("Device Details")), - FieldSet("virtual_machine_id", "vminterface_id", name=_("Virtual Machine Details")), + FieldSet( + "q", + "tag", + name=None, + ), + FieldSet( + "access_list_id", + "direction", + name=_("ACL Details"), + ), + FieldSet( + "region_id", + "site_group_id", + "site_id", + "device_id", + "interface_id", + name=_("Device Details"), + ), + FieldSet( + "virtual_machine_id", + "vminterface_id", + name=_("Virtual Machine Details"), + ), ) # ACL selector @@ -202,9 +242,25 @@ class ACLStandardRuleFilterForm(NetBoxModelFilterSetForm): model = ACLStandardRule fieldsets = ( - FieldSet("q", "tag", name=None), - FieldSet("access_list_id", "index", "action", name=_("ACL Details")), - FieldSet("source_prefix_id", name=_("Source Details")), + FieldSet( + "q", + "tag", + name=None, + ), + FieldSet( + "access_list_id", + "index", + "action", + "remark", + name=_("ACL Details"), + ), + FieldSet( + "source_aggregate_id", + "source_ipaddress_id", + "source_iprange_id", + "source_prefix_id", + name=_("Source Details"), + ), ) access_list_id = DynamicModelMultipleChoiceField( @@ -224,8 +280,27 @@ class ACLStandardRuleFilterForm(NetBoxModelFilterSetForm): required=False, label=_("Action"), ) + remark = forms.CharField( + required=False, + label=_("Remark"), + ) # Source selectors + source_aggregate_id = DynamicModelMultipleChoiceField( + queryset=Aggregate.objects.all(), + required=False, + label=_("Source Aggregate"), + ) + source_ipaddress_id = DynamicModelMultipleChoiceField( + queryset=IPAddress.objects.all(), + required=False, + label=_("Source IP-Address"), + ) + source_iprange_id = DynamicModelMultipleChoiceField( + queryset=IPRange.objects.all(), + required=False, + label=_("Source IP-Range"), + ) source_prefix_id = DynamicModelMultipleChoiceField( queryset=Prefix.objects.all(), required=False, @@ -243,10 +318,35 @@ class ACLExtendedRuleFilterForm(NetBoxModelFilterSetForm): model = ACLExtendedRule fieldsets = ( - FieldSet("q", "tag", name=None), - FieldSet("access_list_id", "index", "action", "protocol", name=_("ACL Details")), - FieldSet("source_prefix_id", name=_("Source Details")), - FieldSet("destination_prefix_id", name=_("Destination Details")), + FieldSet( + "q", + "tag", + name=None, + ), + FieldSet( + "access_list_id", + "index", + "action", + "remark", + "protocol", + name=_("ACL Details"), + ), + FieldSet( + "source_aggregate_id", + "source_ipaddress_id", + "source_iprange_id", + "source_prefix_id", + "source_port", + name=_("Source Details"), + ), + FieldSet( + "destination_aggregate_id", + "destination_ipaddress_id", + "destination_iprange_id", + "destination_prefix_id", + "destination_port", + name=_("Destination Details"), + ), ) access_list_id = DynamicModelMultipleChoiceField( @@ -266,6 +366,10 @@ class ACLExtendedRuleFilterForm(NetBoxModelFilterSetForm): required=False, label=_("Action"), ) + remark = forms.CharField( + required=False, + label=_("Remark"), + ) protocol = forms.ChoiceField( choices=add_blank_choice(ACLProtocolChoices), required=False, @@ -273,18 +377,56 @@ class ACLExtendedRuleFilterForm(NetBoxModelFilterSetForm): ) # Source selectors + source_aggregate_id = DynamicModelMultipleChoiceField( + queryset=Aggregate.objects.all(), + required=False, + label=_("Source Aggregate"), + ) + source_ipaddress_id = DynamicModelMultipleChoiceField( + queryset=IPAddress.objects.all(), + required=False, + label=_("Source IP-Address"), + ) + source_iprange_id = DynamicModelMultipleChoiceField( + queryset=IPRange.objects.all(), + required=False, + label=_("Source IP-Range"), + ) source_prefix_id = DynamicModelMultipleChoiceField( queryset=Prefix.objects.all(), required=False, label=_("Source Prefix"), ) + source_port = forms.IntegerField( + label=_("Source Port"), + required=False, + ) # Destination selectors + destination_aggregate_id = DynamicModelMultipleChoiceField( + queryset=Aggregate.objects.all(), + required=False, + label=_("Destination Aggregate"), + ) + destination_ipaddress_id = DynamicModelMultipleChoiceField( + queryset=IPAddress.objects.all(), + required=False, + label=_("Destination IP-Address"), + ) + destination_iprange_id = DynamicModelMultipleChoiceField( + queryset=IPRange.objects.all(), + required=False, + label=_("Destination IP-Range"), + ) destination_prefix_id = DynamicModelMultipleChoiceField( queryset=Prefix.objects.all(), required=False, label=_("Destination Prefix"), ) + destination_port = forms.IntegerField( + label=_("Destination Port"), + required=False, + ) # Tag selector tag = TagFilterField(model) diff --git a/netbox_acls/forms/models.py b/netbox_acls/forms/models.py index 42450a70..6e16bc21 100644 --- a/netbox_acls/forms/models.py +++ b/netbox_acls/forms/models.py @@ -4,13 +4,22 @@ from dcim.models import Device, Interface, Region, Site, SiteGroup, VirtualChassis from django.contrib.contenttypes.models import ContentType -from django.core.exceptions import ValidationError +from django.core.exceptions import ObjectDoesNotExist, ValidationError from django.utils.safestring import mark_safe from django.utils.translation import gettext_lazy as _ from ipam.models import Prefix from netbox.forms import NetBoxModelForm -from utilities.forms.fields import CommentField, DynamicModelChoiceField +from utilities.forms import ( + get_field_value, +) +from utilities.forms.fields import ( + CommentField, + ContentTypeChoiceField, + DynamicModelChoiceField, +) from utilities.forms.rendering import FieldSet, TabbedGroups +from utilities.forms.widgets import HTMXSelect +from utilities.templatetags.builtins.filters import bettertitle from virtualization.models import ( Cluster, ClusterGroup, @@ -20,6 +29,7 @@ ) from ..choices import ACLTypeChoices +from ..constants import ACL_RULE_SOURCE_DESTINATION_MODELS from ..models import ( AccessList, ACLExtendedRule, @@ -225,7 +235,7 @@ def clean(self): "__all__": ( "Access Lists must be assigned to one host at a time. Either a device, virtual chassis or " "virtual machine." - ) + ), }, ) @@ -348,21 +358,6 @@ class ACLInterfaceAssignmentForm(NetBoxModelForm): ), ) - def __init__(self, *args, **kwargs): - # Initialize helper selectors - instance = kwargs.get("instance") - initial = kwargs.get("initial", {}).copy() - if instance: - if type(instance.assigned_object) is Interface: - initial["interface"] = instance.assigned_object - initial["device"] = "device" - elif type(instance.assigned_object) is VMInterface: - initial["vminterface"] = instance.assigned_object - initial["virtual_machine"] = "virtual_machine" - kwargs["initial"] = initial - - super().__init__(*args, **kwargs) - class Meta: model = ACLInterfaceAssignment fields = ( @@ -382,6 +377,21 @@ class Meta: ), } + def __init__(self, *args, **kwargs): + # Initialize helper selectors + instance = kwargs.get("instance") + initial = kwargs.get("initial", {}).copy() + if instance: + if type(instance.assigned_object) is Interface: + initial["interface"] = instance.assigned_object + initial["device"] = "device" + elif type(instance.assigned_object) is VMInterface: + initial["vminterface"] = instance.assigned_object + initial["virtual_machine"] = "virtual_machine" + kwargs["initial"] = initial + + super().__init__(*args, **kwargs) + def clean(self): """ Validates form inputs before submitting: @@ -495,11 +505,22 @@ class ACLStandardRuleForm(NetBoxModelForm): ), label="Access List", ) - source_prefix = DynamicModelChoiceField( - queryset=Prefix.objects.all(), + + # Source + source_type = ContentTypeChoiceField( + queryset=ContentType.objects.filter(ACL_RULE_SOURCE_DESTINATION_MODELS), required=False, + widget=HTMXSelect(), + label=_("Source Type"), help_text=help_text_acl_rule_logic, - label="Source Prefix", + ) + source = DynamicModelChoiceField( + queryset=Prefix.objects.none(), # Initial queryset + selector=True, + required=False, + label=_("Source"), + help_text=help_text_acl_rule_logic, + disabled=True, ) fieldsets = ( @@ -512,10 +533,17 @@ class ACLStandardRuleForm(NetBoxModelForm): FieldSet( "index", "action", - "remark", - "source_prefix", name=_("Rule Definition"), ), + FieldSet( + "remark", + name=_("Remark"), + ), + FieldSet( + "source_type", + "source", + name=_("Source Definition"), + ), ) class Meta: @@ -525,7 +553,7 @@ class Meta: "index", "action", "remark", - "source_prefix", + "source_type", "tags", "description", ) @@ -534,10 +562,54 @@ class Meta: "index": help_text_acl_rule_index, "action": help_text_acl_action, "remark": mark_safe( - "*Note: CANNOT be set if source prefix OR action is set.", + "*Note: CANNOT be set if source OR action is set.", ), } + def __init__(self, *args, **kwargs) -> None: + """ + Initialize the ACLStandardRuleForm. + """ + + # Initialize fields with initial values + instance = kwargs.get("instance") + initial = kwargs.get("initial", {}).copy() + + if instance is not None and instance.source: + # Initialize the source object field + initial["source"] = instance.source + + kwargs["initial"] = initial + + super().__init__(*args, **kwargs) + + if source_type_id := get_field_value(self, "source_type"): + try: + # Retrieve the ContentType model class based on the source type + source_type = ContentType.objects.get(pk=source_type_id) + source_model = source_type.model_class() + + # Configure the queryset and label for the source field + self.fields["source"].queryset = source_model.objects.all() + self.fields["source"].widget.attrs["selector"] = source_model._meta.label_lower + self.fields["source"].disabled = False + self.fields["source"].label = _("Source " + bettertitle(source_model._meta.verbose_name)) + except ObjectDoesNotExist: + pass + + # Clears the source field if the selected type changes + if self.instance and self.instance.pk and source_type_id != self.instance.source_type_id: + self.initial["source"] = None + + def clean(self): + """ + Validate form fields for the ACL Standard Rule form. + """ + super().clean() + + # Ensure the selected source object gets assigned + self.instance.source = self.cleaned_data.get("source") + class ACLExtendedRuleForm(NetBoxModelForm): """ @@ -557,18 +629,40 @@ class ACLExtendedRuleForm(NetBoxModelForm): label="Access List", ) - source_prefix = DynamicModelChoiceField( - queryset=Prefix.objects.all(), + # Source + source_type = ContentTypeChoiceField( + queryset=ContentType.objects.filter(ACL_RULE_SOURCE_DESTINATION_MODELS), required=False, + widget=HTMXSelect(), + label=_("Source Type"), help_text=help_text_acl_rule_logic, - label="Source Prefix", ) - destination_prefix = DynamicModelChoiceField( - queryset=Prefix.objects.all(), + source = DynamicModelChoiceField( + queryset=Prefix.objects.none(), # Initial queryset + selector=True, required=False, + label=_("Source"), help_text=help_text_acl_rule_logic, - label="Destination Prefix", + disabled=True, ) + + # Destination + destination_type = ContentTypeChoiceField( + queryset=ContentType.objects.filter(ACL_RULE_SOURCE_DESTINATION_MODELS), + required=False, + widget=HTMXSelect(), + label=_("Destination Type"), + help_text=help_text_acl_rule_logic, + ) + destination = DynamicModelChoiceField( + queryset=Prefix.objects.none(), # Initial queryset + selector=True, + required=False, + label=_("Destination"), + help_text=help_text_acl_rule_logic, + disabled=True, + ) + fieldsets = ( FieldSet( "access_list", @@ -579,13 +673,27 @@ class ACLExtendedRuleForm(NetBoxModelForm): FieldSet( "index", "action", + name=_("Rule Definition"), + ), + FieldSet( "remark", - "source_prefix", + name=_("Remark"), + ), + FieldSet( + "protocol", + name=_("Protocol"), + ), + FieldSet( + "source_type", + "source", "source_ports", - "destination_prefix", + name=_("Source Definition"), + ), + FieldSet( + "destination_type", + "destination", "destination_ports", - "protocol", - name=_("Rule Definition"), + name=_("Destination Definition"), ), ) @@ -596,9 +704,9 @@ class Meta: "index", "action", "remark", - "source_prefix", + "source_type", "source_ports", - "destination_prefix", + "destination_type", "destination_ports", "protocol", "tags", @@ -615,3 +723,73 @@ class Meta: ), "source_ports": help_text_acl_rule_logic, } + + def __init__(self, *args, **kwargs) -> None: + """ + Initialize the ACLExtendedRuleForm. + """ + + # Initialize fields with initial values + instance = kwargs.get("instance") + initial = kwargs.get("initial", {}).copy() + + if instance is not None and instance.source: + # Initialize the source object field + initial["source"] = instance.source + if instance is not None and instance.destination: + # Initialize the destination object field + initial["destination"] = instance.destination + + kwargs["initial"] = initial + + super().__init__(*args, **kwargs) + + # Source + if source_type_id := get_field_value(self, "source_type"): + try: + # Retrieve the ContentType model class based on the source type + source_type = ContentType.objects.get(pk=source_type_id) + source_model = source_type.model_class() + + # Configure the queryset and label for the source field + self.fields["source"].queryset = source_model.objects.all() + self.fields["source"].widget.attrs["selector"] = source_model._meta.label_lower + self.fields["source"].disabled = False + self.fields["source"].label = _("Source " + bettertitle(source_model._meta.verbose_name)) + except ObjectDoesNotExist: + pass + + # Clears the source field if the selected type changes + if self.instance and self.instance.pk and source_type_id != self.instance.source_type_id: + self.initial["source"] = None + + # Destination + if destination_type_id := get_field_value(self, "destination_type"): + try: + # Retrieve the ContentType model class based on the destination type + destination_type = ContentType.objects.get(pk=destination_type_id) + destination_model = destination_type.model_class() + + # Configure the queryset and label for the destination field + self.fields["destination"].queryset = destination_model.objects.all() + self.fields["destination"].widget.attrs["selector"] = destination_model._meta.label_lower + self.fields["destination"].disabled = False + self.fields["destination"].label = _("Destination " + bettertitle(destination_model._meta.verbose_name)) + except ObjectDoesNotExist: + pass + + # Clears the destination field if the selected type changes + if self.instance and self.instance.pk and destination_type_id != self.instance.destination_type_id: + self.initial["destination"] = None + + def clean(self): + """ + Validate form fields for the ACL Extended Rule form. + """ + super().clean() + + # Ensure the selected source object gets assigned + self.instance.source = self.cleaned_data.get("source") + + # Ensure the selected destination object gets assigned + self.instance.destination = self.cleaned_data.get("destination") diff --git a/netbox_acls/graphql/filters/access_list_rules.py b/netbox_acls/graphql/filters/access_list_rules.py index be1e0b2d..50be41db 100644 --- a/netbox_acls/graphql/filters/access_list_rules.py +++ b/netbox_acls/graphql/filters/access_list_rules.py @@ -3,6 +3,7 @@ import strawberry import strawberry_django +from core.graphql.filters import ContentTypeFilter from netbox.graphql.filter_mixins import NetBoxModelFilterMixin from strawberry.scalars import ID from strawberry_django import FilterLookup @@ -10,7 +11,6 @@ from ... import models if TYPE_CHECKING: - from ipam.graphql.filters import PrefixFilter from netbox.graphql.filter_lookups import IntegerArrayLookup, IntegerLookup from ..enums import ( @@ -39,14 +39,19 @@ class ACLRuleFilterMixin(NetBoxModelFilterMixin): index: Annotated["IntegerLookup", strawberry.lazy("netbox.graphql.filter_lookups")] | None = ( strawberry_django.filter_field() ) - remark: FilterLookup[str] | None = strawberry_django.filter_field() description: FilterLookup[str] | None = strawberry_django.filter_field() action: Annotated["ACLRuleActionEnum", strawberry.lazy("netbox_acls.graphql.enums")] | None = ( strawberry_django.filter_field() ) - source_prefix: Annotated["PrefixFilter", strawberry.lazy("ipam.graphql.filters")] | None = ( + + # Remark + remark: FilterLookup[str] | None = strawberry_django.filter_field() + + # Source + source_type: Annotated["ContentTypeFilter", strawberry.lazy("core.graphql.filters")] | None = ( strawberry_django.filter_field() ) + source_id: ID | None = strawberry_django.filter_field() @strawberry_django.filter(models.ACLStandardRule, lookups=True) @@ -64,15 +69,21 @@ class ACLExtendedRuleFilter(ACLRuleFilterMixin): GraphQL filter definition for the ACLExtendedRule model. """ + # Source source_ports: Annotated["IntegerArrayLookup", strawberry.lazy("netbox.graphql.filter_lookups")] | None = ( strawberry_django.filter_field() ) - destination_prefix: Annotated["PrefixFilter", strawberry.lazy("ipam.graphql.filters")] | None = ( + + # Destination + destination_type: Annotated["ContentTypeFilter", strawberry.lazy("core.graphql.filters")] | None = ( strawberry_django.filter_field() ) + destination_id: ID | None = strawberry_django.filter_field() destination_ports: Annotated["IntegerArrayLookup", strawberry.lazy("netbox.graphql.filter_lookups")] | None = ( strawberry_django.filter_field() ) + + # Protocol protocol: Annotated["ACLProtocolEnum", strawberry.lazy("netbox_acls.graphql.enums")] | None = ( strawberry_django.filter_field() ) diff --git a/netbox_acls/graphql/types.py b/netbox_acls/graphql/types.py index d7888d56..18aad285 100644 --- a/netbox_acls/graphql/types.py +++ b/netbox_acls/graphql/types.py @@ -2,15 +2,20 @@ Define the object types and queries available via the graphql api. """ -from typing import Annotated, List, Union +from typing import Annotated, List, Union, TYPE_CHECKING import strawberry import strawberry_django -from netbox.graphql.types import NetBoxObjectType +from netbox.graphql.types import ContentTypeType, NetBoxObjectType from .. import models from . import filters +if TYPE_CHECKING: + from dcim.graphql.types import DeviceType, InterfaceType + from ipam.graphql.types import AggregateType, IPAddressType, IPRangeType, PrefixType + from virtualization.graphql.types import VirtualMachineType, VMInterfaceType + @strawberry_django.type( models.AccessList, @@ -74,6 +79,10 @@ class ACLInterfaceAssignmentType(NetBoxObjectType): @strawberry_django.type( models.ACLStandardRule, fields="__all__", + exclude=[ + "source_id", + "source_type", + ], filters=filters.ACLStandardRuleFilter, ) class ACLStandardRuleType(NetBoxObjectType): @@ -83,12 +92,48 @@ class ACLStandardRuleType(NetBoxObjectType): # Model fields access_list: Annotated["AccessListType", strawberry.lazy("netbox_acls.graphql.types")] - source_prefix: Annotated["PrefixType", strawberry.lazy("ipam.graphql.types")] | None + source_type: Annotated["ContentTypeType", strawberry.lazy("netbox.graphql.types")] | None + source: ( + Annotated[ + Union[ + Annotated[ + "AggregateType", + strawberry.lazy("ipam.graphql.types"), + ], + Annotated[ + "IPAddressType", + strawberry.lazy("ipam.graphql.types"), + ], + Annotated[ + "IPRangeType", + strawberry.lazy("ipam.graphql.types"), + ], + Annotated[ + "PrefixType", + strawberry.lazy("ipam.graphql.types"), + ], + ], + strawberry.union("ACLStandardRuleObjectType"), + ] + | None + ) + + # Cached related source objects + _source_aggregate: Annotated["AggregateType", strawberry.lazy("ipam.graphql.types")] | None + _source_ipaddress: Annotated["IPAddressType", strawberry.lazy("ipam.graphql.types")] | None + _source_iprange: Annotated["IPRangeType", strawberry.lazy("ipam.graphql.types")] | None + _source_prefix: Annotated["PrefixType", strawberry.lazy("ipam.graphql.types")] | None @strawberry_django.type( models.ACLExtendedRule, fields="__all__", + exclude=[ + "source_id", + "source_type", + "destination_id", + "destination_type", + ], filters=filters.ACLExtendedRuleFilter, ) class ACLExtendedRuleType(NetBoxObjectType): @@ -98,7 +143,67 @@ class ACLExtendedRuleType(NetBoxObjectType): # Model fields access_list: Annotated["AccessListType", strawberry.lazy("netbox_acls.graphql.types")] - source_prefix: Annotated["PrefixType", strawberry.lazy("ipam.graphql.types")] | None + source_type: Annotated["ContentTypeType", strawberry.lazy("netbox.graphql.types")] | None + source: ( + Annotated[ + Union[ + Annotated[ + "AggregateType", + strawberry.lazy("ipam.graphql.types"), + ], + Annotated[ + "IPAddressType", + strawberry.lazy("ipam.graphql.types"), + ], + Annotated[ + "IPRangeType", + strawberry.lazy("ipam.graphql.types"), + ], + Annotated[ + "PrefixType", + strawberry.lazy("ipam.graphql.types"), + ], + ], + strawberry.union("ACLStandardRuleObjectType"), + ] + | None + ) source_ports: List[int] | None - destination_prefix: Annotated["PrefixType", strawberry.lazy("ipam.graphql.types")] | None + destination_type: Annotated["ContentTypeType", strawberry.lazy("netbox.graphql.types")] | None + destination: ( + Annotated[ + Union[ + Annotated[ + "AggregateType", + strawberry.lazy("ipam.graphql.types"), + ], + Annotated[ + "IPAddressType", + strawberry.lazy("ipam.graphql.types"), + ], + Annotated[ + "IPRangeType", + strawberry.lazy("ipam.graphql.types"), + ], + Annotated[ + "PrefixType", + strawberry.lazy("ipam.graphql.types"), + ], + ], + strawberry.union("ACLStandardRuleObjectType"), + ] + | None + ) destination_ports: List[int] | None + + # Cached related source objects + _source_aggregate: Annotated["AggregateType", strawberry.lazy("ipam.graphql.types")] | None + _source_ipaddress: Annotated["IPAddressType", strawberry.lazy("ipam.graphql.types")] | None + _source_iprange: Annotated["IPRangeType", strawberry.lazy("ipam.graphql.types")] | None + _source_prefix: Annotated["PrefixType", strawberry.lazy("ipam.graphql.types")] | None + + # Cached related destination objects + _destination_aggregate: Annotated["AggregateType", strawberry.lazy("ipam.graphql.types")] | None + _destination_ipaddress: Annotated["IPAddressType", strawberry.lazy("ipam.graphql.types")] | None + _destination_iprange: Annotated["IPRangeType", strawberry.lazy("ipam.graphql.types")] | None + _destination_prefix: Annotated["PrefixType", strawberry.lazy("ipam.graphql.types")] | None diff --git a/netbox_acls/migrations/0005_acl_rule_source_and_destination_objects.py b/netbox_acls/migrations/0005_acl_rule_source_and_destination_objects.py new file mode 100644 index 00000000..52e32349 --- /dev/null +++ b/netbox_acls/migrations/0005_acl_rule_source_and_destination_objects.py @@ -0,0 +1,257 @@ +import django.db.models.deletion +from django.db import migrations, models + + +def copy_prefix_assignments(apps, schema_editor): + """ + Copy Source and Destination Prefix ForeignKey IDs to the GenericForeignKey + fields. + """ + + db_alias = schema_editor.connection.alias + ContentType = apps.get_model("contenttypes", "ContentType") + Prefix = apps.get_model("ipam", "Prefix") + ACLStandardRule = apps.get_model("netbox_acls", "ACLStandardRule") + ACLExtendedRule = apps.get_model("netbox_acls", "ACLExtendedRule") + + ACLStandardRule.objects.using(db_alias).filter(_source_prefix__isnull=False).update( + source_type=ContentType.objects.get_for_model(Prefix), + source_id=models.F("_source_prefix_id"), + ) + ACLExtendedRule.objects.using(db_alias).filter(_source_prefix__isnull=False).filter( + _destination_prefix__isnull=False + ).update( + source_type=ContentType.objects.get_for_model(Prefix), + source_id=models.F("_source_prefix_id"), + destination_type=ContentType.objects.get_for_model(Prefix), + destination_id=models.F("_destination_prefix_id"), + ) + + +class Migration(migrations.Migration): + dependencies = [ + ("contenttypes", "0002_remove_content_type_name"), + ("extras", "0128_tableconfig"), + ("ipam", "0081_remove_service_device_virtual_machine_add_parent_gfk_index"), + ("netbox_acls", "0004_netbox_acls"), + ] + + operations = [ + migrations.RenameField( + model_name="aclextendedrule", + old_name="destination_prefix", + new_name="_destination_prefix", + ), + migrations.RenameField( + model_name="aclextendedrule", + old_name="source_prefix", + new_name="_source_prefix", + ), + migrations.RenameField( + model_name="aclstandardrule", + old_name="source_prefix", + new_name="_source_prefix", + ), + migrations.AddField( + model_name="aclextendedrule", + name="_destination_aggregate", + field=models.ForeignKey( + blank=True, + null=True, + on_delete=django.db.models.deletion.PROTECT, + related_name="_%(class)s_destinations", + to="ipam.aggregate", + ), + ), + migrations.AddField( + model_name="aclextendedrule", + name="_destination_ipaddress", + field=models.ForeignKey( + blank=True, + null=True, + on_delete=django.db.models.deletion.PROTECT, + related_name="_%(class)s_destinations", + to="ipam.ipaddress", + ), + ), + migrations.AddField( + model_name="aclextendedrule", + name="_destination_iprange", + field=models.ForeignKey( + blank=True, + null=True, + on_delete=django.db.models.deletion.PROTECT, + related_name="_%(class)s_destinations", + to="ipam.iprange", + ), + ), + migrations.AddField( + model_name="aclextendedrule", + name="_source_aggregate", + field=models.ForeignKey( + blank=True, + null=True, + on_delete=django.db.models.deletion.PROTECT, + related_name="_%(class)s_sources", + to="ipam.aggregate", + ), + ), + migrations.AddField( + model_name="aclextendedrule", + name="_source_ipaddress", + field=models.ForeignKey( + blank=True, + null=True, + on_delete=django.db.models.deletion.PROTECT, + related_name="_%(class)s_sources", + to="ipam.ipaddress", + ), + ), + migrations.AddField( + model_name="aclextendedrule", + name="_source_iprange", + field=models.ForeignKey( + blank=True, + null=True, + on_delete=django.db.models.deletion.PROTECT, + related_name="_%(class)s_sources", + to="ipam.iprange", + ), + ), + migrations.AddField( + model_name="aclextendedrule", + name="destination_id", + field=models.PositiveBigIntegerField(blank=True, null=True), + ), + migrations.AddField( + model_name="aclextendedrule", + name="destination_type", + field=models.ForeignKey( + blank=True, + limit_choices_to=models.Q( + models.Q(("app_label", "ipam"), ("model__in", ("aggregate", "ipaddress", "iprange", "prefix"))) + ), + null=True, + on_delete=django.db.models.deletion.PROTECT, + related_name="+", + to="contenttypes.contenttype", + ), + ), + migrations.AddField( + model_name="aclextendedrule", + name="source_id", + field=models.PositiveBigIntegerField(blank=True, null=True), + ), + migrations.AddField( + model_name="aclextendedrule", + name="source_type", + field=models.ForeignKey( + blank=True, + limit_choices_to=models.Q( + models.Q(("app_label", "ipam"), ("model__in", ("aggregate", "ipaddress", "iprange", "prefix"))) + ), + null=True, + on_delete=django.db.models.deletion.PROTECT, + related_name="+", + to="contenttypes.contenttype", + ), + ), + migrations.AddField( + model_name="aclstandardrule", + name="_source_aggregate", + field=models.ForeignKey( + blank=True, + null=True, + on_delete=django.db.models.deletion.PROTECT, + related_name="_%(class)s_sources", + to="ipam.aggregate", + ), + ), + migrations.AddField( + model_name="aclstandardrule", + name="_source_ipaddress", + field=models.ForeignKey( + blank=True, + null=True, + on_delete=django.db.models.deletion.PROTECT, + related_name="_%(class)s_sources", + to="ipam.ipaddress", + ), + ), + migrations.AddField( + model_name="aclstandardrule", + name="_source_iprange", + field=models.ForeignKey( + blank=True, + null=True, + on_delete=django.db.models.deletion.PROTECT, + related_name="_%(class)s_sources", + to="ipam.iprange", + ), + ), + migrations.AddField( + model_name="aclstandardrule", + name="source_id", + field=models.PositiveBigIntegerField(blank=True, null=True), + ), + migrations.AddField( + model_name="aclstandardrule", + name="source_type", + field=models.ForeignKey( + blank=True, + limit_choices_to=models.Q( + models.Q(("app_label", "ipam"), ("model__in", ("aggregate", "ipaddress", "iprange", "prefix"))) + ), + null=True, + on_delete=django.db.models.deletion.PROTECT, + related_name="+", + to="contenttypes.contenttype", + ), + ), + migrations.AlterField( + model_name="aclextendedrule", + name="_destination_prefix", + field=models.ForeignKey( + blank=True, + null=True, + on_delete=django.db.models.deletion.PROTECT, + related_name="_%(class)s_destinations", + to="ipam.prefix", + ), + ), + migrations.AlterField( + model_name="aclextendedrule", + name="_source_prefix", + field=models.ForeignKey( + blank=True, + null=True, + on_delete=django.db.models.deletion.PROTECT, + related_name="_%(class)s_sources", + to="ipam.prefix", + ), + ), + migrations.AlterField( + model_name="aclstandardrule", + name="_source_prefix", + field=models.ForeignKey( + blank=True, + null=True, + on_delete=django.db.models.deletion.PROTECT, + related_name="_%(class)s_sources", + to="ipam.prefix", + ), + ), + migrations.AddIndex( + model_name="aclextendedrule", + index=models.Index( + fields=["destination_type", "destination_id", "source_type", "source_id"], + name="netbox_acls_destina_8f93b4_idx", + ), + ), + migrations.AddIndex( + model_name="aclstandardrule", + index=models.Index(fields=["source_type", "source_id"], name="netbox_acls_source__01d2fa_idx"), + ), + # Copy over existing Prefix assignments + migrations.RunPython(code=copy_prefix_assignments, reverse_code=migrations.RunPython.noop), + ] diff --git a/netbox_acls/models/access_list_rules.py b/netbox_acls/models/access_list_rules.py index c8a46c7b..d7b7b92a 100644 --- a/netbox_acls/models/access_list_rules.py +++ b/netbox_acls/models/access_list_rules.py @@ -2,14 +2,18 @@ Define the django models for this plugin. """ +from django.apps import apps +from django.contrib.contenttypes.fields import GenericForeignKey, GenericRelation from django.contrib.postgres.fields import ArrayField from django.core.exceptions import ValidationError from django.db import models from django.urls import reverse from django.utils.translation import gettext_lazy as _ +from ipam.models import Aggregate, IPAddress, IPRange, Prefix from netbox.models import NetBoxModel from ..choices import ACLProtocolChoices, ACLRuleActionChoices, ACLTypeChoices +from ..constants import ACL_RULE_SOURCE_DESTINATION_MODELS from .access_lists import AccessList __all__ = ( @@ -21,16 +25,14 @@ # Error message when the action is 'remark', but no remark is provided. ERROR_MESSAGE_NO_REMARK = _("When the action is 'remark', a remark is required.") -# Error message when the action is 'remark', but the source_prefix is set. -ERROR_MESSAGE_ACTION_REMARK_SOURCE_PREFIX_SET = _("When the action is 'remark', the Source Prefix must not be set.") +# Error message when the action is 'remark', but the source is set. +ERROR_MESSAGE_ACTION_REMARK_SOURCE_SET = _("When the action is 'remark', the Source must not be set.") # Error message when the action is 'remark', but the source_ports are set. ERROR_MESSAGE_ACTION_REMARK_SOURCE_PORTS_SET = _("When the action is 'remark', Source Ports must not be set.") -# Error message when the action is 'remark', but the destination_prefix is set. -ERROR_MESSAGE_ACTION_REMARK_DESTINATION_PREFIX_SET = _( - "When the action is 'remark', the Destination Prefix must not be set." -) +# Error message when the action is 'remark', but the destination is set. +ERROR_MESSAGE_ACTION_REMARK_DESTINATION_SET = _("When the action is 'remark', the Destination must not be set.") # Error message when the action is 'remark', but the destination_ports are set. ERROR_MESSAGE_ACTION_REMARK_DESTINATION_PORTS_SET = _("When the action is 'remark', Destination Ports must not be set.") @@ -54,12 +56,9 @@ class ACLRule(NetBoxModel): related_name="rules", verbose_name=_("Access List"), ) + + # Rule index = models.PositiveIntegerField() - remark = models.CharField( - verbose_name=_("Remark"), - max_length=500, - blank=True, - ) description = models.CharField( verbose_name=_("Description"), max_length=500, @@ -70,17 +69,75 @@ class ACLRule(NetBoxModel): max_length=30, choices=ACLRuleActionChoices, ) - source_prefix = models.ForeignKey( - to="ipam.prefix", + + # Remark + remark = models.CharField( + verbose_name=_("Remark"), + max_length=500, + blank=True, + ) + + # Source + source_type = models.ForeignKey( + to="contenttypes.ContentType", on_delete=models.PROTECT, related_name="+", + limit_choices_to=ACL_RULE_SOURCE_DESTINATION_MODELS, + verbose_name=_("Source Type"), + blank=True, + null=True, + ) + source_id = models.PositiveBigIntegerField( + verbose_name=_("Source ID"), + blank=True, + null=True, + ) + source = GenericForeignKey( + ct_field="source_type", + fk_field="source_id", + ) + + # Cached related objects by association name for faster access + _source_aggregate = models.ForeignKey( + to="ipam.aggregate", + on_delete=models.PROTECT, + related_name="_%(class)s_sources", + verbose_name=_("Source Aggregate"), + blank=True, + null=True, + ) + _source_ipaddress = models.ForeignKey( + to="ipam.ipaddress", + on_delete=models.PROTECT, + related_name="_%(class)s_sources", + verbose_name=_("Source IP-Address"), + blank=True, + null=True, + ) + _source_iprange = models.ForeignKey( + to="ipam.iprange", + on_delete=models.PROTECT, + related_name="_%(class)s_sources", + verbose_name=_("Source IP-Range"), + blank=True, + null=True, + ) + _source_prefix = models.ForeignKey( + to="ipam.prefix", + on_delete=models.PROTECT, + related_name="_%(class)s_sources", verbose_name=_("Source Prefix"), blank=True, null=True, ) - clone_fields = ("access_list", "action", "source_prefix") - prerequisite_models = ("netbox_acls.AccessList",) + clone_fields = ( + "access_list", + "action", + "source_id", + "source_type", + ) + prerequisite_models: tuple = ("netbox_acls.AccessList",) class Meta: """ @@ -91,12 +148,56 @@ class Meta: """ abstract = True + indexes = (models.Index(fields=("source_type", "source_id")),) ordering = ["access_list", "index"] unique_together = ["access_list", "index"] def __str__(self): return f"{self.access_list}: Rule {self.index}" + def clean(self): + """ + Override the model's clean method for custom field validation. + """ + # Validate source assignment + if self.source_type and not (self.source or self.source_id): + source_type = self.source_type.model_class() + raise ValidationError( + { + "source": _("Please select a source {source_type}.").format( + source_type=source_type._meta.verbose_name + ) + } + ) + super().clean() + + def save(self, *args, **kwargs): + """ + Saves the current instance to the database. + """ + # Cache the related source objects for faster access + self.cache_related_source_object() + + super().save(*args, **kwargs) + + def cache_related_source_object(self): + """ + Cache the related source objects for faster access. + """ + self._source_aggregate = self._source_ipaddress = self._source_iprange = self._source_prefix = None + if self.source_type: + source_type = self.source_type.model_class() + if source_type == apps.get_model("ipam", "aggregate"): + self._source_aggregate = self.source + elif source_type == apps.get_model("ipam", "ipaddress"): + self._source_ipaddress = self.source + elif source_type == apps.get_model("ipam", "iprange"): + self._source_iprange = self.source + elif source_type == apps.get_model("ipam", "prefix"): + self._source_prefix = self.source + + cache_related_source_object.alters_data = True + def get_absolute_url(self): """ The method is a Django convention; although not strictly required, @@ -140,7 +241,7 @@ def clean(self): Validate the ACL Standard Rule inputs. If the action is 'remark', then the remark field must be provided (non-empty), - and the source_prefix field must be empty. + and the source field must be empty. Conversely, if the remark field is provided, the action must be set to 'remark'. """ @@ -151,8 +252,8 @@ def clean(self): if self.action == ACLRuleActionChoices.ACTION_REMARK: if not self.remark: errors["remark"] = ERROR_MESSAGE_NO_REMARK - if self.source_prefix: - errors["source_prefix"] = ERROR_MESSAGE_ACTION_REMARK_SOURCE_PREFIX_SET + if self.source: + errors["source"] = ERROR_MESSAGE_ACTION_REMARK_SOURCE_SET # Validate that the action is "remark", when the remark field is provided elif self.remark: errors["remark"] = ERROR_MESSAGE_REMARK_WITHOUT_ACTION_REMARK @@ -164,7 +265,8 @@ def clean(self): class ACLExtendedRule(ACLRule): """ Inherits ACLRule. - Add ACLExtendedRule specific fields: source_ports, destination_prefix, destination_ports, and protocol + + Add ACLExtendedRule specific fields: source_ports, destination, destination_ports, and protocol """ access_list = models.ForeignKey( @@ -174,39 +276,91 @@ class ACLExtendedRule(ACLRule): limit_choices_to={"type": "extended"}, verbose_name=_("Extended Access List"), ) + + # Protocol + protocol = models.CharField( + verbose_name=_("Protocol"), + max_length=30, + choices=ACLProtocolChoices, + blank=True, + ) + + # Source source_ports = ArrayField( base_field=models.PositiveIntegerField(), verbose_name=_("Source Ports"), blank=True, null=True, ) - destination_prefix = models.ForeignKey( - to="ipam.prefix", + + # Destination + destination_type = models.ForeignKey( + to="contenttypes.ContentType", on_delete=models.PROTECT, related_name="+", - verbose_name=_("Destination Prefix"), + limit_choices_to=ACL_RULE_SOURCE_DESTINATION_MODELS, + verbose_name=_("Destination Type"), + blank=True, + null=True, + ) + destination_id = models.PositiveBigIntegerField( + verbose_name=_("Destination ID"), blank=True, null=True, ) + destination = GenericForeignKey( + ct_field="destination_type", + fk_field="destination_id", + ) destination_ports = ArrayField( base_field=models.PositiveIntegerField(), verbose_name=_("Destination Ports"), blank=True, null=True, ) - protocol = models.CharField( - verbose_name=_("Protocol"), - max_length=30, - choices=ACLProtocolChoices, + + # Cached related objects by association name for faster access + _destination_aggregate = models.ForeignKey( + to="ipam.aggregate", + on_delete=models.PROTECT, + related_name="_%(class)s_destinations", + verbose_name=_("Destination Aggregate"), + blank=True, + null=True, + ) + _destination_ipaddress = models.ForeignKey( + to="ipam.ipaddress", + on_delete=models.PROTECT, + related_name="_%(class)s_destinations", + verbose_name=_("Destination IP-Address"), blank=True, + null=True, + ) + _destination_iprange = models.ForeignKey( + to="ipam.iprange", + on_delete=models.PROTECT, + related_name="_%(class)s_destinations", + verbose_name=_("Destination IP-Range"), + blank=True, + null=True, + ) + _destination_prefix = models.ForeignKey( + to="ipam.prefix", + on_delete=models.PROTECT, + related_name="_%(class)s_destinations", + verbose_name=_("Destination Prefix"), + blank=True, + null=True, ) clone_fields = ( "access_list", "action", - "source_prefix", + "source_id", + "source_type", "source_ports", - "destination_prefix", + "destination_id", + "destination_type", "destination_ports", "protocol", ) @@ -221,6 +375,7 @@ class Meta(ACLRule.Meta): verbose_name = _("ACL Extended Rule") verbose_name_plural = _("ACL Extended Rules") + indexes = (models.Index(fields=("destination_type", "destination_id", "source_type", "source_id")),) def clean(self): """ @@ -228,27 +383,39 @@ def clean(self): When the action is 'remark', the remark field must be provided (non-empty), and the following fields must be empty: - - source_prefix + - source - source_ports - - destination_prefix + - destination - destination_ports - protocol Conversely, if a remark is provided, the action must be set to 'remark'. """ + # Validate destination assignment + if self.destination_type and not (self.destination or self.destination_id): + destination_type = self.destination_type.model_class() + raise ValidationError( + { + "destination": _("Please select a destination {destination_type}.").format( + destination_type=destination_type._meta.verbose_name, + ), + }, + ) + super().clean() + errors = {} # Validate that only the remark field is filled if self.action == ACLRuleActionChoices.ACTION_REMARK: if not self.remark: errors["remark"] = ERROR_MESSAGE_NO_REMARK - if self.source_prefix: - errors["source_prefix"] = ERROR_MESSAGE_ACTION_REMARK_SOURCE_PREFIX_SET + if self.source: + errors["source"] = ERROR_MESSAGE_ACTION_REMARK_SOURCE_SET if self.source_ports: errors["source_ports"] = ERROR_MESSAGE_ACTION_REMARK_SOURCE_PORTS_SET - if self.destination_prefix: - errors["destination_prefix"] = ERROR_MESSAGE_ACTION_REMARK_DESTINATION_PREFIX_SET + if self.destination: + errors["destination"] = ERROR_MESSAGE_ACTION_REMARK_DESTINATION_SET if self.destination_ports: errors["destination_ports"] = ERROR_MESSAGE_ACTION_REMARK_DESTINATION_PORTS_SET if self.protocol: @@ -260,5 +427,140 @@ def clean(self): if errors: raise ValidationError(errors) + def save(self, *args, **kwargs): + """ + Saves the current instance to the database. + """ + # Cache the related destination objects for faster access + self.cache_related_destination_objects() + + super().save(*args, **kwargs) + + def cache_related_destination_objects(self): + """ + Cache the related destination objects for faster access. + """ + self._destination_aggregate = self._destination_ipaddress = self._destination_iprange = ( + self._destination_prefix + ) = None + if self.destination_type: + destination_type = self.destination_type.model_class() + if destination_type == apps.get_model("ipam", "aggregate"): + self._destination_aggregate = self.destination + elif destination_type == apps.get_model("ipam", "ipaddress"): + self._destination_ipaddress = self.destination + elif destination_type == apps.get_model("ipam", "iprange"): + self._destination_iprange = self.destination + elif destination_type == apps.get_model("ipam", "prefix"): + self._destination_prefix = self.destination + + cache_related_destination_objects.alters_data = True + def get_protocol_color(self): return ACLProtocolChoices.colors.get(self.protocol) + + +# +# Generic Relations: ACLStandardRule +# + +# Source Aggregate +GenericRelation( + to=ACLStandardRule, + content_type_field="source_type", + object_id_field="source_id", + related_query_name="source_aggregate", +).contribute_to_class(Aggregate, "accesslist_standard_rule_sources") + +# Source IPAddress +GenericRelation( + to=ACLStandardRule, + content_type_field="source_type", + object_id_field="source_id", + related_query_name="source_ip_address", +).contribute_to_class(IPAddress, "accesslist_standard_rule_sources") + +# Source IPRange +GenericRelation( + to=ACLStandardRule, + content_type_field="source_type", + object_id_field="source_id", + related_query_name="source_ip_range", +).contribute_to_class(IPRange, "accesslist_standard_rule_sources") + +# Source Prefix +GenericRelation( + to=ACLStandardRule, + content_type_field="source_type", + object_id_field="source_id", + related_query_name="source_prefix", +).contribute_to_class(Prefix, "accesslist_standard_rule_sources") + + +# +# Generic Relations: ACLExtendedRule +# + +# Source Aggregate +GenericRelation( + to=ACLExtendedRule, + content_type_field="source_type", + object_id_field="source_id", + related_query_name="source_aggregate", +).contribute_to_class(Aggregate, "accesslist_extended_rule_sources") + +# Source IPAddress +GenericRelation( + to=ACLExtendedRule, + content_type_field="source_type", + object_id_field="source_id", + related_query_name="source_ip_address", +).contribute_to_class(IPAddress, "accesslist_extended_rule_sources") + +# Source IPRange +GenericRelation( + to=ACLExtendedRule, + content_type_field="source_type", + object_id_field="source_id", + related_query_name="source_ip_range", +).contribute_to_class(IPRange, "accesslist_extended_rule_sources") + +# Source Prefix +GenericRelation( + to=ACLExtendedRule, + content_type_field="source_type", + object_id_field="source_id", + related_query_name="source_prefix", +).contribute_to_class(Prefix, "accesslist_extended_rule_sources") + +# Destination Aggregate +GenericRelation( + to=ACLExtendedRule, + content_type_field="destination_type", + object_id_field="destination_id", + related_query_name="destination_aggregate", +).contribute_to_class(Aggregate, "accesslist_extended_rule_destinations") + +# Destination IPAddress +GenericRelation( + to=ACLExtendedRule, + content_type_field="destination_type", + object_id_field="destination_id", + related_query_name="destination_ip_address", +).contribute_to_class(IPAddress, "accesslist_extended_rule_destinations") + +# Destination IPRange +GenericRelation( + to=ACLExtendedRule, + content_type_field="destination_type", + object_id_field="destination_id", + related_query_name="destination_ip_range", +).contribute_to_class(IPRange, "accesslist_extended_rule_destinations") + +# Destination Prefix +GenericRelation( + to=ACLExtendedRule, + content_type_field="destination_type", + object_id_field="destination_id", + related_query_name="destination_prefix", +).contribute_to_class(Prefix, "accesslist_extended_rule_destinations") diff --git a/netbox_acls/tables.py b/netbox_acls/tables.py index bc26a23e..d346a124 100644 --- a/netbox_acls/tables.py +++ b/netbox_acls/tables.py @@ -141,6 +141,16 @@ class ACLStandardRuleTable(NetBoxTable): url_name="plugins:netbox_acls:aclstandardrule_list", ) + # Source + source_type = columns.ContentTypeColumn( + verbose_name=_("Source Type"), + ) + source = tables.Column( + verbose_name=_("Source"), + orderable=False, + linkify=True, + ) + class Meta(NetBoxTable.Meta): model = ACLStandardRule fields = ( @@ -152,14 +162,14 @@ class Meta(NetBoxTable.Meta): "remark", "tags", "description", - "source_prefix", + "source", ) default_columns = ( "access_list", "index", "action", "remark", - "source_prefix", + "source", "tags", ) @@ -181,6 +191,26 @@ class ACLExtendedRuleTable(NetBoxTable): ) protocol = ChoiceFieldColumn() + # Source + source_type = columns.ContentTypeColumn( + verbose_name=_("Source Type"), + ) + source = tables.Column( + verbose_name=_("Source"), + orderable=False, + linkify=True, + ) + + # Destination + destination_type = columns.ContentTypeColumn( + verbose_name=_("Destination Type"), + ) + destination = tables.Column( + verbose_name=_("Destination"), + orderable=False, + linkify=True, + ) + class Meta(NetBoxTable.Meta): model = ACLExtendedRule fields = ( @@ -192,9 +222,9 @@ class Meta(NetBoxTable.Meta): "remark", "tags", "description", - "source_prefix", + "source", "source_ports", - "destination_prefix", + "destination", "destination_ports", "protocol", ) @@ -203,10 +233,10 @@ class Meta(NetBoxTable.Meta): "index", "action", "remark", - "tags", - "source_prefix", + "protocol", + "source", "source_ports", - "destination_prefix", + "destination", "destination_ports", - "protocol", + "tags", ) diff --git a/netbox_acls/templates/netbox_acls/aclextendedrule.html b/netbox_acls/templates/netbox_acls/aclextendedrule.html index 3e2bad24..d40c0d42 100644 --- a/netbox_acls/templates/netbox_acls/aclextendedrule.html +++ b/netbox_acls/templates/netbox_acls/aclextendedrule.html @@ -45,16 +45,16 @@