diff --git a/.gitignore b/.gitignore index 894a44c..483b151 100644 --- a/.gitignore +++ b/.gitignore @@ -102,3 +102,4 @@ venv.bak/ # mypy .mypy_cache/ +PR* diff --git a/.vscode/settings.json b/.vscode/settings.json new file mode 100644 index 0000000..5bacf27 --- /dev/null +++ b/.vscode/settings.json @@ -0,0 +1,3 @@ +{ + "python.analysis.typeCheckingMode": "standard" +} diff --git a/README.md b/README.md index 21cdd0c..30134ec 100644 --- a/README.md +++ b/README.md @@ -1,5 +1,4 @@ - # python3-nmap A python 3 library which helps in using nmap port scanner. The way this tools works is by defining each nmap command into a python function making it very easy to use sophisticated nmap commands in other python scripts. Nmap is a complicated piece of software used for reconnaissance on target networks, over the years new features have been added making it more sophisticated. @@ -7,23 +6,30 @@ A python 3 library which helps in using nmap port scanner. The way this tools w With this python3-nmap we make using nmap in python very easy and painless For example in nmap if you want to scan for common ports you would to something like this + ```sh -$ nmap your-host.com --top-ports 10 +nmap your-host.com --top-ports 10 ``` + But in this python3-nmap script you would do something like this + ```py import nmap3 nmap = nmap3.Nmap() results = nmap.scan_top_ports("your-host.com") # And you would get your results in json ``` + You will notice each nmap command is defined as a python function/method. this make it easy to remember this in python and easily use them. Again in nmap if you want to use the famous dns-brute script you would do something like this + ```sh -$ nmap your-host.com --script dns-brute.nse +nmap your-host.com --script dns-brute.nse ``` + But in this python3 script again it's very easy you just do something like this + ```py import nmap3 nmap = nmap3.Nmap() @@ -43,9 +49,11 @@ results = nmap.nmap_dns_brute_script("your-host.com") ``` #### How to use python3-nmap + Using this scripts is very easy, though it assumes you have nmap already installed, as it is the primary dependence required. Also this tools supports both windows and linux, it's cross platform so to say. **Installation** + ```sh $ git clone https://github.com/wangoloj/python3-nmap.git @@ -57,7 +65,9 @@ $ apt-get install nmap # That's all is needed to get started ``` + In nmap some commands require root privileges for example the command to identify OS requires root privileges; + ```sh $ nmap -O your-host.com @@ -68,14 +78,17 @@ QUITTING! $ sudo nmap -O your-host.com ``` + The same applies to the script to be able to run the os identifier you have to be a super user. ### How to use the script to identify OS + ```py import nmap3 nmap = nmap3.Nmap() os_results = nmap.nmap_os_detection("192.168.178.2") # MOST BE ROOT ``` + ```json [ { @@ -134,23 +147,29 @@ os_results = nmap.nmap_os_detection("192.168.178.2") # MOST BE ROOT ``` ### Class components of python3-nmap + The script is made of up the following classes, each holding different nmap abilities and scan types. - - Nmap - - NmapHostDiscovery - - NmapScanTechniques +- Nmap +- NmapHostDiscovery +- NmapScanTechniques ### Identifying service version + In nmap if you want to identify versions you would run this kind of command + ```sh -$ nmap 192.168.178.1 -sV +nmap 192.168.178.1 -sV ``` + In this python script you would do something like this + ```py import nmap3 nmap = nmap3.Nmap() version_result = nmap.nmap_version_detection("your-host.com") ``` + ```json [ { @@ -210,121 +229,157 @@ version_result = nmap.nmap_version_detection("your-host.com") } ] ``` + ### Nmap commands available + The following nmaps commands have been added to the following scripts - - get Nmap version details +- get Nmap version details + ```python import nmap3 nmap = nmap3.Nmap() results = nmap.nmap_version() ``` - - Nmap top port scan + +- Nmap top port scan + ```python import nmap3 nmap = nmap3.Nmap() results = nmap.scan_top_ports("your-host") ``` - - Nmap Dns-brute-script( to get subdomains ) + +- Nmap Dns-brute-script( to get subdomains ) + ```python import nmap3 nmap = nmap3.Nmap() results = nmap.nmap_dns_brute_script("domain") ``` - - Nmap list scan + +- Nmap list scan + ```python import nmap3 nmap = nmap3.Nmap() results = nmap.nmap_list_scan("your-host") ``` - - Nmap Os detection + +- Nmap Os detection + ```python import nmap3 nmap = nmap3.Nmap() results = nmap.nmap_os_detection("your-host"); ``` - - Nmap subnet scan + +- Nmap subnet scan + ```python import nmap3 nmap = nmap3.Nmap() results = nmap.nmap_subnet_scan("your-host") #Must be root ``` - - Nmap version detection + +- Nmap version detection + ```python import nmap3 nmap = nmap3.Nmap() results = nmap.nmap_version_detection("your-host") # Must be root ``` -### Nmap Scanning Techniques +### Nmap Scanning Techniques + The script offers nmap scan techniques also as python function/methods - - nmap_fin_scan + +- nmap_fin_scan + ```python import nmap3 nmap = nmap3.NmapScanTechniques() result = nmap.nmap_fin_scan("192.168.178.1") ``` - - - nmap_idle_scan + +- nmap_idle_scan + ```python import nmap3 nmap = nmap3.NmapScanTechniques() result = nmap.nmap_idle_scan("192.168.178.1") ``` - - nmap_ping_scan + +- nmap_ping_scan + ```python import nmap3 nmap = nmap3.NmapScanTechniques() result = nmap.nmap_ping_scan("192.168.178.1") ``` - - nmap_syn_scan + +- nmap_syn_scan + ```python import nmap3 nmap = nmap3.NmapScanTechniques() result = nmap.nmap_syn_scan("192.168.178.1") ``` - - nmap_tcp_scan + +- nmap_tcp_scan + ```python import nmap3 nmap = nmap3.NmapScanTechniques() result = nmap.nmap_tcp_scan("192.168.178.1") ``` - + - nmap_udp_scan + ```python import nmap3 nmap = nmap3.NmapScanTechniques() result = nmap.nmap_udp_scan("192.168.178.1") ``` + ### Supporting the nmap host discovery + The script also offers support for map Added Nmap Host discovery techniques still as python function/methods - - Only port scan (-Pn) - - Only host discover (-sn) - - Arp discovery on a local network (-PR) - - Disable DNS resolution (-n) +- Only port scan (-Pn) +- Only host discover (-sn) +- Arp discovery on a local network (-PR) +- Disable DNS resolution (-n) NmapHostDiscovery - - `def nmap_portscan_only(self, host, args=None)` +- `def nmap_portscan_only(self, host, args=None)` + ```python import nmap3 nmap = nmap3.NmapHostDiscovery() results = nmap.nmap_portscan_only("your-host") ``` - - `def nmap_no_portscan(self, host, args=None):` + +- `def nmap_no_portscan(self, host, args=None):` + ```python import nmap3 nmap = nmap3.NmapHostDiscovery() results = nmap.nmap_no_portscan("your-host") ``` - - `def nmap_arp_discovery(self, host, args=None):` + +- `def nmap_arp_discovery(self, host, args=None):` + ```python import nmap3 nmap = nmap3.NmapHostDiscovery() results = nmap.nmap_arp_discovery("your-host") + ``` - - `def nmap_disable_dns(self, host, args=None):` + +- `def nmap_disable_dns(self, host, args=None):` + ```python import nmap3 nmap = nmap3.NmapHostDiscovery() @@ -333,7 +388,8 @@ NmapHostDiscovery Nmap is a large tool, as you can see python3-nmap provides only things what you could say commonly used nmap features. -### Using custom nmap command line arguments. +### Using custom nmap command line arguments + As we said, the script defines each set of nmap command as python function/methods. You can also pass arguments to those methods/function thus extending your capabilities for example. Let's say we want to scan top ports but also perform version detection . @@ -344,6 +400,7 @@ Let's say we want to scan top ports but also perform version detection . ``` ### Using the nmap vulners script to identify vulnerabilities (CVE's) + You scan the the target IP using version detection ('-sV') to get the service and, the script performs a lookup in the CVE database. The nmap vulners script is part of the default Nmap installation, so you shouldn't need to install any other packages. ```python @@ -353,15 +410,17 @@ You scan the the target IP using version detection ('-sV') to get the service an ``` ## Cross-Selling -* [Ethical-tools](https://ethicaltools.gitbook.io/subdomainfinder/) -* [Wappalyzer online](https://www.nmmapper.com/st/cms-detection/wappalyzer-online/) -* [Whatweb online](https://www.nmmapper.com/tools/cms-detection/whatweb-online/WhatWeb/) -* [Raccoon By Offensive security](https://www.nmmapper.com/tools/reconnaissance-tools/raccoon-vulnerability-scanning/Raccoon%20tool/) -* [Detect WAF](https://www.nmmapper.com/tools/reconnaissance-tools/waf/web-application-firewall-detector/) -* [Dnsdumpster](https://dnsdumpster.readthedocs.io/) -* [Become a patreon](https://www.patreon.com/nmmapper) -* [Online port scanner](https://www.nmmapper.com/st/networkmapper/nmap/online-port-scanning/) +- [Ethical-tools](https://ethicaltools.gitbook.io/subdomainfinder/) + +- [Wappalyzer online](https://www.nmmapper.com/st/cms-detection/wappalyzer-online/) +- [Whatweb online](https://www.nmmapper.com/tools/cms-detection/whatweb-online/WhatWeb/) +- [Raccoon By Offensive security](https://www.nmmapper.com/tools/reconnaissance-tools/raccoon-vulnerability-scanning/Raccoon%20tool/) +- [Detect WAF](https://www.nmmapper.com/tools/reconnaissance-tools/waf/web-application-firewall-detector/) +- [Dnsdumpster](https://dnsdumpster.readthedocs.io/) +- [Become a patreon](https://www.patreon.com/nmmapper) +- [Online port scanner](https://www.nmmapper.com/st/networkmapper/nmap/online-port-scanning/) ## Stargazers over time + [![Stargazers over time](https://starchart.cc/nmmapper/python3-nmap.svg?variant=adaptive)](https://starchart.cc/nmmapper/python3-nmap) diff --git a/docs/conf.py b/docs/conf.py index bf7d704..c94f215 100644 --- a/docs/conf.py +++ b/docs/conf.py @@ -17,9 +17,9 @@ # -- Project information ----------------------------------------------------- -project = 'python3-nmap' -copyright = '2019, Wangolo Joel' -author = 'Wangolo Joel' +project = "python3-nmap" +copyright = "2019, Wangolo Joel" +author = "Wangolo Joel" # -- General configuration --------------------------------------------------- @@ -27,16 +27,15 @@ # Add any Sphinx extension module names here, as strings. They can be # extensions coming with Sphinx (named 'sphinx.ext.*') or your custom # ones. -extensions = [ -] +extensions = [] # Add any paths that contain templates here, relative to this directory. -templates_path = ['_templates'] +templates_path = ["_templates"] # List of patterns, relative to source directory, that match files and # directories to ignore when looking for source files. # This pattern also affects html_static_path and html_extra_path. -exclude_patterns = ['_build', 'Thumbs.db', '.DS_Store'] +exclude_patterns = ["_build", "Thumbs.db", ".DS_Store"] # -- Options for HTML output ------------------------------------------------- @@ -44,14 +43,14 @@ # The theme to use for HTML and HTML Help pages. See the documentation for # a list of builtin themes. # -html_theme = 'sphinx_rtd_theme' +html_theme = "sphinx_rtd_theme" # Add any paths that contain custom static files (such as style sheets) here, # relative to this directory. They are copied after the builtin static files, # so a file named "default.css" will overwrite the builtin "default.css". -html_static_path = ['_static'] +html_static_path = ["_static"] -#-----PERSONAL---- -master_doc = 'index' -source_suffix = '.rst' +# -----PERSONAL---- +master_doc = "index" +source_suffix = ".rst" diff --git a/example.py b/example.py index c6c5832..d5f946c 100644 --- a/example.py +++ b/example.py @@ -48,38 +48,38 @@ # } # } -import sys +import typing import nmap3 -import simplejson as json +import json from pygments import highlight, lexers, formatters -def scan_techniques(nmt, scan_type, target): - if scan_type == '-sF': +def scan_techniques(nmt: nmap3.NmapScanTechniques, scan_type: str, target: str): + if scan_type == "-sF": fin_scan = nmt.nmap_fin_scan(target) return fin_scan - elif scan_type == '-sI': + elif scan_type == "-sI": idle_scan = nmt.nmap_idle_scan(target) - return + return idle_scan - elif scan_type == '-sP': + elif scan_type == "-sP": ping_scan = nmt.nmap_ping_scan(target) return ping_scan - elif scan_type == '-sS': + elif scan_type == "-sS": syn_scan = nmt.nmap_syn_scan(target) return syn_scan - elif scan_type == '-F': + elif scan_type == "-F": syn_fast_scan = nmt.nmap_syn_scan(target) return syn_fast_scan - elif scan_type == '-sT': + elif scan_type == "-sT": tcp_scan = nmt.nmap_tcp_scan(target) return tcp_scan - elif scan_type == '-sU': + elif scan_type == "-sU": udp_scan = nmt.nmap_udp_scan(target) return udp_scan @@ -87,60 +87,62 @@ def scan_techniques(nmt, scan_type, target): raise ValueError("Not a scan technique") -def scan_discovery(nmd, scan_type, target, ports_num): - if scan_type == '-Pn': +def scan_discovery( + nmd: nmap3.NmapHostDiscovery, scan_type: str, target: str, ports_num: int +): + if scan_type == "-Pn": no_ping = nmd.nmap_portscan_only(target) return no_ping - elif scan_type == '-sn': + elif scan_type == "-sn": ping_scan = nmd.nmap_no_portscan(target) return ping_scan - elif scan_type == '-PR': + elif scan_type == "-PR": arp_scan = nmd.nmap_no_portscan(target) return arp_scan - elif scan_type == '-n': + elif scan_type == "-n": disable_dns = nmd.nmap_disable_dns(target) return disable_dns - elif scan_type == '-O --osscan-guess': + elif scan_type == "-O --osscan-guess": no_ping_os_detection = nmd.nmap_portscan_only(target, args=scan_type) return no_ping_os_detection - elif scan_type == '-A -T2': + elif scan_type == "-A -T2": no_ping_stealth = nmd.nmap_portscan_only(target, args=scan_type) return no_ping_stealth - elif scan_type == '-A': + elif scan_type == "-A": no_ping_advanced = nmd.nmap_portscan_only(target, args=scan_type) return no_ping_advanced - elif scan_type == '-A -v': + elif scan_type == "-A -v": no_ping_advanced_verbose = nmd.nmap_portscan_only(target, args=scan_type) return no_ping_advanced_verbose - elif scan_type == '-T4 -sV': + elif scan_type == "-T4 -sV": no_ping_aggressive_service = nmd.nmap_portscan_only(target, args=scan_type) return no_ping_aggressive_service - elif scan_type == '-n -A': + elif scan_type == "-n -A": no_ping_no_dns = nmd.nmap_portscan_only(target, args=scan_type) return no_ping_no_dns - elif scan_type == '-n -V': + elif scan_type == "-n -V": no_ping_advanced_service = nmd.nmap_portscan_only(target, args=scan_type) return no_ping_advanced_service - elif scan_type == '-f -A': + elif scan_type == "-f -A": no_ping_fragment = nmd.nmap_portscan_only(target, args=scan_type) return no_ping_fragment - elif scan_type == '-n -sV --version-intensity 3': + elif scan_type == "-n -sV --version-intensity 3": no_ping_version_intensity = nmd.nmap_portscan_only(target, args=scan_type) return no_ping_version_intensity - elif scan_type == '-O --osscan-guess -p ': + elif scan_type == "-O --osscan-guess -p ": scan_type = scan_type + str(ports_num) no_ping_detect_ports = nmd.nmap_portscan_only(target, args=scan_type) return no_ping_detect_ports @@ -149,40 +151,40 @@ def scan_discovery(nmd, scan_type, target, ports_num): raise ValueError("Not a scan technique") -def scan_command(nm, scan_type, target, domain): - if scan_type == '-sA': +def scan_command(nm: nmap3.Nmap, scan_type: str, target: str, domain: str): + if scan_type == "-sA": firewall_detect = nm.nmap_detect_firewall(target) return firewall_detect - elif scan_type == '-O': + elif scan_type == "-O": os_detect = nm.nmap_os_detection(target) return os_detect - elif scan_type == '--top-ports': + elif scan_type == "--top-ports": top_ports = nm.scan_top_ports(target) return top_ports - elif scan_type == '20 -sZ': + elif scan_type == "20 -sZ": top_ports_sctp = nm.scan_top_ports(target) return top_ports_sctp - elif scan_type == '-script dns-brute': + elif scan_type == "-script dns-brute": dns_brute = nm.nmap_dns_brute_script(domain) return dns_brute - elif scan_type == '-sL': + elif scan_type == "-sL": hostslist = nm.nmap_list_scan(target) return hostslist - elif scan_type == '-p-': + elif scan_type == "-p-": subnet_scan = nm.nmap_subnet_scan(target) return subnet_scan - elif scan_type == '-sV': + elif scan_type == "-sV": service_basic = nm.nmap_version_detection(target) return service_basic - elif scan_type == '-sX': + elif scan_type == "-sX": service_xmas = nm.nmap_version_detection(target, args=scan_type) return service_xmas @@ -190,47 +192,50 @@ def scan_command(nm, scan_type, target, domain): raise ValueError("Not a scan technique") -def launch(target, domain, ports, templates): - def tpl(i): +def launch( + target: str, + domain: typing.Optional[str] = None, + ports: typing.Optional[int] = None, + templates: typing.Optional[int] = None, +): + def tpl(i: int): template = { # OPTIONS FOR THE SCAN TECHNIQUE FUNCTION - 1: '-sF', # 'FIN scan' - 2: '-sI', # 'Idle scan' - 3: '-sS', # 'Default: TCP SYN scan' - 4: '-sP', # 'ping-only' - 5: '-sT', # 'TCP connect() scan' - 6: '-sU', # 'UDP scan' - 7: '-F', # 'Fast scan' - + 1: "-sF", # 'FIN scan' + 2: "-sI", # 'Idle scan' + 3: "-sS", # 'Default: TCP SYN scan' + 4: "-sP", # 'ping-only' + 5: "-sT", # 'TCP connect() scan' + 6: "-sU", # 'UDP scan' + 7: "-F", # 'Fast scan' # OPTIONS FOR THE SCAN DISCOVERY FUNCTION - 8: '-Pn', # 'No ping scan' - 9: '-sn', # 'Liveness detection: no port scan' - 10: '-PR', # 'ARP scan: local network only' - 11: '-n', # 'Disable DNS resolution: reduces noise' - 12: '-O --osscan-guess', # 'Used with no ping: aggressive OS detection' - 13: '-A', + 8: "-Pn", # 'No ping scan' + 9: "-sn", # 'Liveness detection: no port scan' + 10: "-PR", # 'ARP scan: local network only' + 11: "-n", # 'Disable DNS resolution: reduces noise' + 12: "-O --osscan-guess", # 'Used with no ping: aggressive OS detection' + 13: "-A", # 'Used with no ping: Advanced detection: OS detection and Version detection, Script scanning and Traceroute' - 14: '-A -T2', # 'Used with no ping: Advanced detection: with stealth scan mode' - 15: '-A -v', # 'Used with no ping: Advanced detection: verbose' - 16: '-n -A', # 'Used with no ping: Advanced detection: scan with no DNS resolution' - 17: '-f -A', # 'Used with no ping: Advanced detection: combined with packet fragmentation' - 18: '-T4 -sV', # 'Used with no ping: Aggressive service detection' - 19: '-n -sV --version-intensity 3', + 14: "-A -T2", # 'Used with no ping: Advanced detection: with stealth scan mode' + 15: "-A -v", # 'Used with no ping: Advanced detection: verbose' + 16: "-n -A", # 'Used with no ping: Advanced detection: scan with no DNS resolution' + 17: "-f -A", # 'Used with no ping: Advanced detection: combined with packet fragmentation' + 18: "-T4 -sV", # 'Used with no ping: Aggressive service detection' + 19: "-n -sV --version-intensity 3", # 'Used with no ping: Aggressive service detection: with version-intensity 3' - 20: '-n -V', # 'Used with no ping: Number version detection' - 21: '-O --osscan-guess -p ', # 'Used with no ping: OS detection with port selection' - + 20: "-n -V", # 'Used with no ping: Number version detection' + 21: "-O --osscan-guess -p ", # 'Used with no ping: OS detection with port selection' # OPTIONS FOR THE SCAN COMMAND FUNCTION - 22: '-sX', # 'Basic service detection combined with Xmas scan' - 23: '-sA', # 'Firewall rule detection: ACK scan' - 24: '-O', # 'OS detection' - 25: '20 -sZ', # 'SCTP: Advanced silent scan for top20 ports' - 26: '--top-ports', # 'Top ports scan (1000 ports)' - 27: '-script dns-brute', # 'Dns-brute-script( to get subdomains )' - 28: '-sL', + 22: "-sX", # 'Basic service detection combined with Xmas scan' + 23: "-sA", # 'Firewall rule detection: ACK scan' + 24: "-O", # 'OS detection' + 25: "20 -sZ", # 'SCTP: Advanced silent scan for top20 ports' + 26: "--top-ports", # 'Top ports scan (1000 ports)' + 27: "-script dns-brute", # 'Dns-brute-script( to get subdomains )' + 28: "-sL", # 'List scan: lists each host on the network(s) specified, without sending any packets to the target hosts' - 29: '-p-', # 'Subnet scan' - 30: '-sV' # 'Basic service detection' + 29: "-p-", # 'Subnet scan' + 30: "-sV", # 'Basic service detection' } return template.get(i) @@ -252,63 +257,91 @@ def tpl(i): if templates or domain: if ports: # Not in the final code - just for debug - choice = tpl(21) + str(ports) + choice = (tpl(21) or "") + str(ports) print("\n\nTrying option: ", choice) - tpl = tpl(21) - res = scan_discovery(nmd, tpl, target, ports) + scan_type = tpl(21) + if not scan_type: + raise ValueError("No scan type provided") + res = scan_discovery(nmd, scan_type, target, ports) # Print for debug - colored_json = highlight(json.dumps(res, indent=4, sort_keys=True), lexers.JsonLexer(), - formatters.TerminalFormatter()) + colored_json = highlight( + json.dumps(res, indent=4, sort_keys=True), + lexers.JsonLexer(), + formatters.TerminalFormatter(), + ) print("\n\n", colored_json) elif domain: - tpl = tpl(27) - res = scan_command(nm, tpl, None, domain) + scan_type = tpl(27) + if not scan_type: + raise ValueError("No scan type provided") + res = scan_command(nm, scan_type, "", domain) # Print for debug - colored_json = highlight(json.dumps(res, indent=4, sort_keys=True), lexers.JsonLexer(), - formatters.TerminalFormatter()) + colored_json = highlight( + json.dumps(res, indent=4, sort_keys=True), + lexers.JsonLexer(), + formatters.TerminalFormatter(), + ) print("\n\n", colored_json) else: - tpl = tpl(templates) - print("\n\nTrying option: ", tpl) + templates = typing.cast(int, templates) + scan_type = tpl(templates) + if not scan_type: + raise ValueError("No scan type provided") + print("\n\nTrying option: ", scan_type) if templates <= 7: - res = scan_techniques(nmt, tpl, target) + res = scan_techniques(nmt, scan_type, target) # Print for debug - colored_json = highlight(json.dumps(res, indent=4, sort_keys=True), lexers.JsonLexer(), - formatters.TerminalFormatter()) + colored_json = highlight( + json.dumps(res, indent=4, sort_keys=True), + lexers.JsonLexer(), + formatters.TerminalFormatter(), + ) print("\n\n", colored_json) elif templates in range(8, 22): - res = scan_discovery(nmd, tpl, target, None) + res = scan_discovery(nmd, scan_type, target, 0) # Print for debug - colored_json = highlight(json.dumps(res, indent=4, sort_keys=True), lexers.JsonLexer(), - formatters.TerminalFormatter()) + colored_json = highlight( + json.dumps(res, indent=4, sort_keys=True), + lexers.JsonLexer(), + formatters.TerminalFormatter(), + ) print("\n\n", colored_json) else: - res = scan_command(nm, tpl, target, None) + res = scan_command(nm, scan_type, target, "") # Print for debug - colored_json = highlight(json.dumps(res, indent=4, sort_keys=True), lexers.JsonLexer(), - formatters.TerminalFormatter()) + colored_json = highlight( + json.dumps(res, indent=4, sort_keys=True), + lexers.JsonLexer(), + formatters.TerminalFormatter(), + ) print("\n\n", colored_json) else: - tpl = tpl(3) - res = scan_techniques(nmt, tpl, target) + scan_type = tpl(3) + if not scan_type: + raise ValueError("No scan type provided") + res = scan_techniques(nmt, scan_type, target) # Print for debug - colored_json = highlight(json.dumps(res, indent=4, sort_keys=True), lexers.JsonLexer(), - formatters.TerminalFormatter()) + colored_json = highlight( + json.dumps(res, indent=4, sort_keys=True), + lexers.JsonLexer(), + formatters.TerminalFormatter(), + ) print("No option was set\n\n", colored_json) -if __name__ == '__main__': + +if __name__ == "__main__": launch("localhost", None, None, 24) diff --git a/nmap3/__init__.py b/nmap3/__init__.py index 99e9920..f957254 100644 --- a/nmap3/__init__.py +++ b/nmap3/__init__.py @@ -1,23 +1,23 @@ # __init__.py -# +# # Copyright 2020 Wangolo Joel -# +# # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation; either version 2 of the License, or # (at your option) any later version. -# +# # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. -# +# # You should have received a copy of the GNU General Public License # along with this program; if not, write to the Free Software # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, # MA 02110-1301, USA. -# -# +# +# from .nmap3 import * # noqa from .nmap3 import __author__ # noqa diff --git a/nmap3/exceptions.py b/nmap3/exceptions.py index bd3a15f..d1823be 100644 --- a/nmap3/exceptions.py +++ b/nmap3/exceptions.py @@ -1,43 +1,68 @@ # nmap3.py -# +# # Copyright 2019 Wangolo Joel -# +# # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation; either version 2 of the License, or # (at your option) any later version. -# +# # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. -# +# # You should have received a copy of the GNU General Public License # along with this program; if not, write to the Free Software # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, # MA 02110-1301, USA. -# +# -__author__ = 'Wangolo Joel (inquiry@nmapper.com)' -__version__ = '1.9.3' -__last_modification__ = 'Jun/06/2025' +__author__ = "Wangolo Joel (inquiry@nmapper.com)" +__version__ = "1.9.3" +__last_modification__ = "Jun/06/2025" -class NmapNotInstalledError(Exception): +import typing + + +class NmapError(Exception): + """Base exception for all nmap3 exceptions""" + + def __init__(self, message: str = "An error occurred in nmap3"): + self.message = message + super().__init__(message) + + +class NmapNotInstalledError(NmapError): """Exception raised when nmap is not installed""" - - def __init__(self, path=""): + + def __init__(self, path: typing.Optional[str] = None): self.message = f"Nmap is either not installed or we couldn't locate \ nmap path. Please ensure nmap is installed and provide right path string. \n\ Provided: *{path if path else 'Not provided'}*" super().__init__(self.message) - -class NmapXMLParserError(Exception): + + +class NmapXMLParserError(NmapError): """Exception raised when we can't parse the output""" - - def __init__(self, message="Unable to parse xml output"): - self.message = message + + def __init__(self, message: str = "Unable to parse xml output"): super().__init__(message) -class NmapExecutionError(Exception): + +class NmapExecutionError(NmapError): """Exception raised when en error occurred during nmap call""" + +class NmapTimeoutError(NmapError): + """Exception raised when nmap execution times out""" + + def __init__(self, message: str = "Nmap execution timed out"): + super().__init__(message) + + +class NmapPrivilegeError(NmapError): + """Exception raised when nmap requires root privileges to run""" + + def __init__(self, message: str = "Nmap requires root privileges to run"): + super().__init__(message) diff --git a/nmap3/nmap3.py b/nmap3/nmap3.py index 294678c..326c190 100644 --- a/nmap3/nmap3.py +++ b/nmap3/nmap3.py @@ -19,140 +19,266 @@ # # +import logging +import typing import shlex import subprocess import sys +import re import argparse import asyncio from xml.etree import ElementTree as ET from xml.etree.ElementTree import ParseError -from nmap3.nmapparser import NmapCommandParser -from nmap3.utils import get_nmap_path, user_is_root -from nmap3.exceptions import NmapXMLParserError, NmapExecutionError -import re -__author__ = 'Wangolo Joel (inquiry@nmapper.com)' -__version__ = '1.9.3' -__last_modification__ = 'Jun/06/2025' +from nmap3.nmapparser import NmapCommandParser +from nmap3.utils import ( + get_nmap_path, + user_is_root, + _terminate_process, + _terminate_asyncio_process, +) +from nmap3.exceptions import ( + NmapXMLParserError, + NmapExecutionError, + NmapTimeoutError, +) + +__author__ = "Wangolo Joel (inquiry@nmapper.com)" +__version__ = "1.9.3" +__last_modification__ = "Jun/14/2025" OS_TYPE = sys.platform +logger = logging.getLogger(__name__) -class Nmap(object): - """ - This nmap class allows us to use the nmap port scanner tool from within python - by calling nmap3.Nmap() - """ - def __init__(self, path:str=''): +class BaseNmap(object): + """Base class for ``nmap`` operations""" + + def __init__(self, path: typing.Optional[str] = None) -> None: """ - Module initialization + Initializes an inatance :param path: Path where nmap is installed on a user system. On linux system it's typically on /usr/bin/nmap. """ - - self.nmaptool = get_nmap_path(path) # check path, search or raise error + self.nmaptool = get_nmap_path(path) # check path, search or raise error self.default_args = "{nmap} {outarg} - " self.maxport = 65535 self.target = "" - self.top_ports = dict() + self.top_ports: typing.Dict[str, typing.Any] = {} self.parser = NmapCommandParser(None) - self.raw_output = None - self.as_root = False + self.raw_output: typing.Optional[str] = None + # self.as_root = False + # """Whether to run as root/administrator""" + + # With this implementation, this method is redundant + # def require_root(self, required: bool = True) -> None: + # """ + # Sets or unsets the instance to run command as 'root' user. + # + # :param required: If True, the nmap command will be run with root privileges + # :param root: The root command to use (default is "sudo" on Unix-like systems). + # Can be "doas' in some Unix distributions + # """ + # self.as_root = required + + def default_command(self) -> str: + """ + Returns the default/root nmap command + that will be chained with all others - def require_root(self, required=True): + e.g nmap -oX - """ - Call this method to add "sudo" in front of nmap call + command = self.default_args.format( + nmap=self.nmaptool, outarg="-v -oX" + ) # adding extra verbosity to feed "task_results" output + return command + + # This does not reaaly do much and may even introduce a recursion error + # def default_command_privileged(self): + # """ + # Commands that require root privileges + # """ + # if OS_TYPE == 'win32': + # # Elevate privileges and return nmap command + # # For windows now is not fully supported so just return the default + # return self.default_command() + # else: + # return self.default_args.format(nmap=self.nmaptool, outarg="-oX") + + + def get_xml_et(self, command_output: str) -> ET.Element: """ - self.as_root = required + Parses the command output and returns an XML ElementTree root element - def default_command(self): + :param command_output: The output of the nmap command as a string + :return : An XML ElementTree root element representing the parsed output + :raises NmapXMLParserError: If the output cannot be parsed as XML """ - Returns the default nmap command - that will be chained with all others - eg nmap -oX - + try: + self.raw_output = command_output + return ET.fromstring(command_output) + except ParseError: + raise NmapXMLParserError() + + def get_success_xml_et(self, file_name: str) -> ET.Element: """ - if self.as_root: - return self.default_command_privileged() - #return self.default_args.format(nmap=self.nmaptool, outarg="-oX") - return self.default_args.format(nmap=self.nmaptool, outarg="-v -oX") # adding extra verbosity to feed "task_results" output + Returns an XML Element indicating a successful scan - def default_command_privileged(self): + :param file_name: The name of the file where the scan results are saved + :return: An XML Element with success message and file path """ - Commands that require root privileges + root = ET.Element("root") + success = ET.SubElement(root, "success") + success.text = "Nmap scan completed successfully." + file_path = ET.SubElement(root, "file_path") + file_path.text = "{}".format(file_name) + return root + + +class Nmap(BaseNmap): + """Implements an interface to allows the use of ``nmap`` port scanner tool from within python""" + + def run_command( + self, cmd: typing.List[str], timeout: typing.Optional[float] = None + ) -> str: """ - if OS_TYPE == 'win32': - # Elevate privileges and return nmap command - # For windows now is not fully supported so just return the default - return self.default_command() - else: - return self.default_args.format(nmap=self.nmaptool, outarg="-oX") + Runs the nmap command using popen - def nmap_version(self): + :param cmd: the command we want run, as a list, + e.g /usr/bin/nmap -oX - nmmapper.com --top-ports 10 + :param timeout: command subprocess timeout in seconds. + :return: The output of the command (in the console) as a string """ - Returns nmap version and build details + process = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) + logger.debug(f"Created subprocess with PID {process.pid!r}") + try: + output, errs = process.communicate(timeout=timeout) + except subprocess.CalledProcessError as e: + _terminate_process(process, timeout=0.2) + raise NmapExecutionError( + 'Error during command: "' + " ".join(cmd) + '"\n\n' + str(e) + ) from e + except subprocess.TimeoutExpired as e: + _terminate_process(process, timeout=0.2) + raise NmapTimeoutError( + 'Command timed out after {timeout} seconds: "'.format(timeout=timeout) + + " ".join(cmd) + + '"\n\n' + + str(e) + ) from e + else: + if process.returncode != 0: + raise NmapExecutionError( + 'Error during command: "' + + " ".join(cmd) + + '"\n\n' + + str(errs.decode("utf-8")) + ) + # Response is bytes so decode the output and return + return output.decode("utf-8").strip() + + def scan_command( + self, + target: str, + arg: str, + args: typing.Optional[str] = None, + timeout: typing.Optional[float] = None, + ) -> ET.Element: """ - # nmap version output is not available in XML format (eg. -oX -) - output = self.run_command([self.nmaptool, '--version']) - version_data = {} - for line in output.splitlines(): - if line.startswith('Nmap version '): - version_string = line.split(' ')[2] - version_data['nmap'] = tuple([int(_) for _ in version_string.split('.')]) - elif line.startswith('Compiled with:'): - compiled_with = line.split(':')[1].strip() - version_data['compiled_with'] = tuple(compiled_with.split(' ')) - elif line.startswith('Compiled without:'): - compiled_without = line.split(':')[1].strip() - version_data['compiled_without'] = tuple(compiled_without.split(' ')) - elif line.startswith('Available nsock engines:'): - nsock_engines = line.split(':')[1].strip() - version_data['nsock_engines'] = tuple(nsock_engines.split(' ')) - return version_data + Perform nmap scan using the specified arguments - # Unique method for repetitive tasks - Use of 'target' variable instead of 'host' or 'subnet' - no need to make difference between 2 strings that are used for the same purpose - def scan_command(self, target, arg, args=None, timeout=None): + :param target: The target to scan (IP address or domain) + :param arg: The nmap argument to use for the scan (e.g., "-sS", "-sV", etc.) + :param args: Additional arguments for the scan + :param timeout: Timeout for the scan command in seconds + :return: An XML ElementTree root element containing the scan results + """ self.target = target command_args = "{target} {default}".format(target=target, default=arg) scancommand = self.default_command() + command_args - if (args): + if args: scancommand += " {0}".format(args) scan_shlex = shlex.split(scancommand) output = self.run_command(scan_shlex, timeout=timeout) - file_name=re.search(r'(\-oX|-oN-|oG)\s+[a-zA-Z-_0-9]{1,100}\.[a-zA-Z]+',scancommand) - if file_name: - file_name=scancommand[file_name.start():file_name.end()].split(" ")[0] + file_name_match = re.search( + r"(\-oX|-oN-|oG)\s+[a-zA-Z-_0-9]{1,100}\.[a-zA-Z]+", scancommand + ) + if file_name_match: + file_name = scancommand[ + file_name_match.start() : file_name_match.end() + ].split(" ")[0] return self.get_success_xml_et(file_name) xml_root = self.get_xml_et(output) return xml_root - def scan_top_ports(self, target, default=10, args=None, timeout=None): + def nmap_version(self) -> typing.Dict[str, typing.Tuple[str, ...]]: + """ + Returns nmap version and build details + + :return: A dictionary containing nmap version, compiled with, compiled without, and available nsock engines + """ + # nmap version output is not available in XML format (eg. -oX -) + output = self.run_command([self.nmaptool, "--version"]) + version_data: typing.Dict[str, typing.Tuple[str, ...]] = {} + + for line in output.splitlines(): + if line.startswith("Nmap version "): + version_string = line.split(" ")[2] + version_data["nmap"] = tuple( + [part for part in version_string.split(".")] + ) + elif line.startswith("Compiled with:"): + compiled_with = line.split(":")[1].strip() + version_data["compiled_with"] = tuple(compiled_with.split(" ")) + elif line.startswith("Compiled without:"): + compiled_without = line.split(":")[1].strip() + version_data["compiled_without"] = tuple(compiled_without.split(" ")) + elif line.startswith("Available nsock engines:"): + nsock_engines = line.split(":")[1].strip() + version_data["nsock_engines"] = tuple(nsock_engines.split(" ")) + return version_data + + def scan_top_ports( + self, + target: str, + default: int = 10, + args: typing.Optional[str] = None, + timeout: typing.Optional[float] = None, + ) -> typing.Dict[str, typing.Any]: """ Perform nmap's top ports scan - :param: target can be IP or domain - :param: default is the default top port + :param target: can be IP or domain + :param default: is the default top port + :param args: additional arguments for the scan + :param timeout: timeout for the scan command + :return: A dictionary containing top ports found on the target This top port requires root previledges """ - if (default > self.maxport): + if default > self.maxport: raise ValueError("Port can not be greater than default 65535") self.target = target - if (args): - assert (isinstance(args, str)), "Expected string got {0} instead".format(type(args)) + if args: + assert isinstance(args, str), "Expected string got {0} instead".format( + type(args) + ) - top_port_args = " {target} --top-ports {default}".format(target=target, default=default) + top_port_args = " {target} --top-ports {default}".format( + target=target, default=default + ) scan_command = self.default_command() + top_port_args - if (args): + if args: scan_command += " {0}".format(args) scan_shlex = shlex.split(scan_command) # Run the command and get the output output = self.run_command(scan_shlex, timeout=timeout) if not output: - # Probaby and error was raise + # An error was probably raised raise ValueError("Unable to perform requested command") # Begin parsing the xml response @@ -160,21 +286,41 @@ def scan_top_ports(self, target, default=10, args=None, timeout=None): self.top_ports = self.parser.filter_top_ports(xml_root) return self.top_ports - def nmap_dns_brute_script(self, target, dns_brute="--script dns-brute.nse", args=None, timeout=None): + def nmap_dns_brute_script( + self, + target: str, + dns_brute: str = "--script dns-brute.nse", + args: typing.Optional[str] = None, + timeout: typing.Optional[float] = None, + ) -> typing.List[typing.Dict[str, typing.Any]]: """ Perform nmap scan using the dns-brute script - :param: target can be IP or domain - :param: default is the default top port - - nmap -oX - nmmapper.com --script dns-brute.nse + :param target: can be IP or domain. + :param dns_brute: the dns-brute script to use + :param args: additional arguments for the scan + :param timeout: timeout for the scan command + :return: List of subdomains found by the dns-brute script + + Example usage: + ```python + from nmap3 import NmapScanTechniques + + nmap = NmapScanTechniques() + target = "nmmapper.com" + dns_brute = "--script dns-brute.nse" + args = "--script-args dns-brute.timeout=5" + timeout = 10 # seconds + subdomains = nmap.nmap_dns_brute_script(target, dns_brute, args, timeout) + print(subdomains) + ``` """ self.target = target dns_brute_args = "{target} {default}".format(target=target, default=dns_brute) - + if args: dns_brute_args += " {0}".format(args) - + dns_brute_command = self.default_command() + dns_brute_args dns_brute_shlex = shlex.split(dns_brute_command) # prepare it for popen @@ -186,13 +332,39 @@ def nmap_dns_brute_script(self, target, dns_brute="--script dns-brute.nse", args subdomains = self.parser.filter_subdomains(xml_root) return subdomains - def nmap_version_detection(self, target, arg="-sV", args=None, timeout=None): + def nmap_version_detection( + self, + target: str, + arg: str = "-sV", + args: typing.Optional[str] = None, + timeout: typing.Optional[float] = None, + ) -> typing.Dict[str, typing.Any]: """ Perform nmap scan using the dns-brute script - :param: target can be IP or domain + :param target: can be IP or domain. + :param arg: nmap argument for version detection, default is "-sV" + :param args: additional arguments for the scan + :param timeout: timeout for the scan command + :return: A dictionary containing service names and their versions + Example command line usage: + ``` nmap -oX - nmmapper.com --script dns-brute.nse + ``` + + Example usage: + ```python + from nmap3 import NmapScanTechniques + nmap = NmapScanTechniques() + + target = "nmmapper.com" + arg = "-sV" + args = "--script-args dns-brute.timeout=5" + timeout = 10 # seconds + services = nmap.nmap_version_detection(target, arg, args, timeout) + print(services) + ``` """ xml_root = self.scan_command(target=target, arg=arg, args=args, timeout=timeout) services = self.parser.filter_top_ports(xml_root) @@ -200,106 +372,116 @@ def nmap_version_detection(self, target, arg="-sV", args=None, timeout=None): # Using of basic options for stealth scan @user_is_root - def nmap_stealth_scan(self, target, arg="-Pn -sZ", args=None): + def nmap_stealth_scan( + self, target: str, arg: str = "-Pn -sZ", args: typing.Optional[str] = None + ) -> typing.Dict[str, typing.Any]: """ + Perform nmap's stealth scan on the target + + NOTE: This ``nmap`` scan command requires root/administrator privileges + + :param target: can be IP or domain. + :param arg: nmap argument for stealth scan, default is "-Pn -sZ" + :param args: additional arguments for the scan + :return: List of top ports found on the target + + Example command line usage: nmap -oX - nmmapper.com -Pn -sZ """ xml_root = self.scan_command(target=target, arg=arg, args=args) self.top_ports = self.parser.filter_top_ports(xml_root) return self.top_ports - def nmap_detect_firewall(self, target, arg="-sA", args=None): # requires root + def nmap_detect_firewall( + self, target: str, arg: str = "-sA", args: typing.Optional[str] = None + ) -> ET.Element: # requires root """ + Perform nmap's firewall detection scan on the target + + :param target: can be IP or domain. + :param arg: nmap argument for firewall detection, default is "-sA" + :param args: additional arguments for the scan + :return: XML ElementTree root element containing the scan results + + Example command line usage: + ``` nmap -oX - nmmapper.com -sA - @ TODO + ``` """ return self.scan_command(target=target, arg=arg, args=args) # TODO @user_is_root - def nmap_os_detection(self, target, arg="-O", args=None): # requires root + def nmap_os_detection( + self, target: str, arg: str = "-O", args: typing.Optional[str] = None + ) -> typing.Dict[str, typing.Any]: # requires root """ + Perform nmap's OS detection scan on the target + + NOTE: This ``nmap`` scan command requires root/administrator privileges + + :param target: can be IP or domain. + :param arg: nmap argument for OS detection, default is "-O" + :param args: additional arguments for the scan + :return: Dictionary containing OS information and other details + + Example command line usage: + ``` nmap -oX - nmmapper.com -O - NOTE: Requires root + ``` """ xml_root = self.scan_command(target=target, arg=arg, args=args) results = self.parser.os_identifier_parser(xml_root) return results - def nmap_subnet_scan(self, target, arg="-p-", args=None): # requires root + def nmap_subnet_scan( + self, target: str, arg: str = "-p-", args: typing.Optional[str] = None + ) -> typing.Dict[str, typing.Any]: # may require root """ + Perform nmap's subnet scan on the target + + NOTE: This ``nmap`` scan command may require root/administrator privileges + + :param target: can be IP or domain. + :param arg: nmap argument for subnet scan, default is "-p-" + :param args: additional arguments for the scan + :return: Dictionary containing open ports found in the specified subnet + + Example command line usage: + ``` nmap -oX - nmmapper.com -p- - NOTE: Requires root + ``` """ xml_root = self.scan_command(target=target, arg=arg, args=args) results = self.parser.filter_top_ports(xml_root) return results - def nmap_list_scan(self, target, arg="-sL", args=None): # requires root + def nmap_list_scan( + self, target: str, arg: str = "-sL", args: typing.Optional[str] = None + ) -> typing.Dict[str, typing.Any]: # requires root """ The list scan is a degenerate form of target discovery that simply lists each target of the network(s) specified, without sending any packets to the target targets. NOTE: /usr/bin/nmap -oX - 192.168.178.1/24 -sL + + :param target: can be IP or domain. + :param arg: nmap argument for list scan, default is "-sL" + :param args: additional arguments for the scan + :return: List of targets found in the specified network """ self.target = target xml_root = self.scan_command(target=target, arg=arg, args=args) results = self.parser.filter_top_ports(xml_root) return results - def run_command(self, cmd, timeout=None): - """ - Runs the nmap command using popen - - @param: cmd--> the command we want run eg /usr/bin/nmap -oX - nmmapper.com --top-ports 10 - @param: timeout--> command subprocess timeout in seconds. - """ - sub_proc = subprocess.Popen( - cmd, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE - ) - try: - output, errs = sub_proc.communicate(timeout=timeout) - except Exception as e: - sub_proc.kill() - raise (e) - else: - if 0 != sub_proc.returncode: - raise NmapExecutionError( - 'Error during command: "' + ' '.join(cmd) + '"\n\n' \ - + errs.decode('utf8') - ) - # Response is bytes so decode the output and return - return output.decode('utf8').strip() - - - def get_xml_et(self, command_output): - """ - @ return xml ET - """ - try: - self.raw_output = command_output - return ET.fromstring(command_output) - except ParseError: - raise NmapXMLParserError() - - - def get_success_xml_et(self,file_name): - root = ET.Element("root") - success = ET.SubElement(root, "success") - success.text = "Nmap scan completed successfully." - file_path = ET.SubElement(root, "file_path") - file_path.text = "{}".format(file_name) - ET.ElementTree(root) - return root class NmapScanTechniques(Nmap): """ - Extends Nmap to include nmap commands + Extends `Nmap` to include nmap commands with different scan techniques - This scan techniques include + These scan techniques include: 1) TCP SYN Scan (-sS) 2) TCP connect() scan (-sT) @@ -310,9 +492,8 @@ class NmapScanTechniques(Nmap): 7) IP Scan (-sO) """ - def __init__(self, path:str=''): + def __init__(self, path: typing.Optional[str] = None): super(NmapScanTechniques, self).__init__(path=path) - self.sync_scan = "-sS" self.tcp_connt = "-sT" self.fin_scan = "-sF" @@ -320,124 +501,202 @@ def __init__(self, path:str=''): self.idle_scan = "-sL" self.udp_scan = "-sU" self.ip_scan = "-sO" - self.parser = NmapCommandParser(None) - - # Unique method for repetitive tasks - Use of 'target' variable instead of 'host' or 'subnet' - no need to make difference between 2 strings that are used for the same purpose. Creating a scan template as a switcher - def scan_command(self, scan_type, target, args, timeout=None): - def tpl(i): - scan_template = { - 1: self.fin_scan, - 2: self.sync_scan, - 3: self.tcp_connt, - 4: self.ping_scan, - 5: self.idle_scan, - 6: self.udp_scan, - 7: self.ip_scan - } - - return scan_template.get(i) - - for i in range(1, 8): - if scan_type == tpl(i): - scan = " {target} {default}".format(target=target, default=scan_type) - scan_type_command = self.default_command() + scan + self.scan_types = { + self.fin_scan, + self.sync_scan, + self.tcp_connt, + self.ping_scan, + self.idle_scan, + self.udp_scan, + self.ip_scan, + } + + def scan_command( # type: ignore[override] + self, + scan_type: str, + target: str, + args: typing.Optional[str] = None, + timeout: typing.Optional[float] = None, + ) -> ET.Element: + """ + Perform nmap scan using the specified scan type - if (args): - scan_type_command += " {0}".format(args) + :param scan_type: The type of scan to perform (e.g., "-sS", "-sT", etc.) + :param target: The target to scan (IP address or domain) + :param args: Additional arguments for the scan + :param timeout: Timeout for the scan command in seconds + """ + if scan_type not in self.scan_types: + raise ValueError( + f"Invalid scan type: {scan_type}. Valid types are: {self.scan_types}" + ) - scan_shlex = shlex.split(scan_type_command) + scan = " {target} {default}".format(target=target, default=scan_type) + scan_type_command = self.default_command() + scan - # Use the ping scan parser - output = self.run_command(scan_shlex, timeout=timeout) - xml_root = self.get_xml_et(output) + if args: + scan_type_command += " {0}".format(args) - return xml_root - raise Exception("Something went wrong") - + scan_shlex = shlex.split(scan_type_command) + output = self.run_command(scan_shlex, timeout=timeout) + xml_root = self.get_xml_et(output) + return xml_root @user_is_root - def nmap_fin_scan(self, target, args=None): + def nmap_fin_scan( + self, target: str, args: typing.Optional[str] = None + ) -> typing.Dict[str, typing.Any]: """ Perform scan using nmap's fin scan - @cmd nmap -sF 192.168.178.1 + NOTE: This ``nmap`` scan command requires root/administrator privileges + + :param target: can be IP or domain. + :param args: additional arguments for the scan + :return: A dictionary of open ports found on the target + Example command line usage: + ``` + nmap -sF 192.168.178.1 + ``` """ xml_root = self.scan_command(self.fin_scan, target=target, args=args) results = self.parser.filter_top_ports(xml_root) return results - + @user_is_root - def nmap_syn_scan(self, target, args=None): + def nmap_syn_scan( + self, target: str, args: typing.Optional[str] = None + ) -> typing.Dict[str, typing.Any]: """ - Perform syn scan on this given - target + Perform syn scan on this given target + + NOTE: This ``nmap`` scan command requires root/administrator privileges - @cmd nmap -sS 192.168.178.1 + :param target: can be IP or domain. + :param args: additional arguments for the scan + :return: A dictionary of open ports found on the target + + Example command line usage: + ``` + nmap -sS 192.168.178.1 + ``` """ xml_root = self.scan_command(self.sync_scan, target=target, args=args) results = self.parser.filter_top_ports(xml_root) return results - def nmap_tcp_scan(self, target, args=None): + def nmap_tcp_scan( + self, target: str, args: typing.Optional[str] = None + ) -> typing.Dict[str, typing.Any]: """ Scan target using the nmap tcp connect - @cmd nmap -sT 192.168.178.1 + :param target: can be IP or domain. + :param args: additional arguments for the scan + :return: A dictionary of open ports found on the target + + Example command line usage: + ``` + nmap -sT 192.168.178.1 + ``` """ - if (args): - assert (isinstance(args, str)), "Expected string got {0} instead".format(type(args)) + if args: + assert isinstance(args, str), "Expected string got {0} instead".format( + type(args) + ) xml_root = self.scan_command(self.tcp_connt, target=target, args=args) results = self.parser.filter_top_ports(xml_root) return results - + @user_is_root - def nmap_udp_scan(self, target, args=None): + def nmap_udp_scan( + self, target: str, args: typing.Optional[str] = None + ) -> typing.Dict[str, typing.Any]: """ Scan target using the nmap tcp connect - @cmd nmap -sU 192.168.178.1 + NOTE: This ``nmap`` scan command requires root/administrator privileges + + :param target: can be IP or domain. + :param args: additional arguments for the scan + :return: A dictionary of open ports found on the target + + Example command line usage: + ``` + nmap -sU 192.168.178.1 + ``` """ - if (args): - assert (isinstance(args, str)), "Expected string got {0} instead".format(type(args)) + if args: + assert isinstance(args, str), "Expected string got {0} instead".format( + type(args) + ) xml_root = self.scan_command(self.udp_scan, target=target, args=args) results = self.parser.filter_top_ports(xml_root) return results - def nmap_ping_scan(self, target, args=None): + def nmap_ping_scan( + self, target: str, args: typing.Optional[str] = None + ) -> typing.Dict[str, typing.Any]: """ - Scan target using nmaps' ping scan + Scan target using nmap's ping scan - @cmd nmap -sP 192.168.178.1 + :param target: can be IP or domain. + :param args: additional arguments for the scan + :return: A dictionary of open ports found on the target + + Example command line usage: + ``` + nmap -sP 192.168.178.1 + ``` """ xml_root = self.scan_command(self.ping_scan, target=target, args=args) results = self.parser.filter_top_ports(xml_root) return results - def nmap_idle_scan(self, target, args=None): + def nmap_idle_scan( + self, target: str, args: typing.Optional[str] = None + ) -> typing.Dict[str, typing.Any]: """ - Using nmap idle_scan + Scan target using nmap's idle scan + + :param target: can be IP or domain. + :param args: additional arguments for the scan + :return: A dictionary of open ports found on the target - @cmd nmap -sL 192.168.178.1 + Example command line usage: + ``` + nmap -sL 192.168.178.1 + ``` """ xml_root = self.scan_command(self.idle_scan, target=target, args=args) results = self.parser.filter_top_ports(xml_root) return results - def nmap_ip_scan(self, target, args=None): + def nmap_ip_scan( + self, target: str, args: typing.Optional[str] = None + ) -> typing.Dict[str, typing.Any]: """ - Using nmap ip_scan + Scan target using nmap's ip scan - @cmd nmap -sO 192.168.178.1 + :param target: can be IP or domain. + :param args: additional arguments for the scan + :return: A dictionary of open ports found on the target + + Example command line usage: + ``` + nmap -sO 192.168.178.1 + ``` """ xml_root = self.scan_command(self.ip_scan, target=target, args=args) results = self.parser.filter_top_ports(xml_root) return results + class NmapHostDiscovery(Nmap): """ - This object will perform host discovery + Extends `Nmap` to include nmap commands to perform host discovery 1) Only port scan (-Pn) 2) Only host discover (-sn) @@ -445,223 +704,664 @@ class NmapHostDiscovery(Nmap): 4) Disable DNS resolution (-n) """ - def __init__(self, path:str=''): + def __init__(self, path: str = "") -> None: super(NmapHostDiscovery, self).__init__(path=path) self.port_scan_only = "-Pn" self.no_port_scan = "-sn" self.arp_discovery = "-PR" self.disable_dns = "-n" + self.scan_types = { + self.port_scan_only, + self.no_port_scan, + self.arp_discovery, + self.disable_dns, + } self.parser = NmapCommandParser(None) - def scan_command(self, scan_type, target, args, timeout=None): - def tpl(i): - scan_template = { - 1: self.port_scan_only, - 2: self.no_port_scan, - 3: self.arp_discovery, - 4: self.disable_dns - } - - return scan_template.get(i) - - for i in range(1, 5): - if scan_type == tpl(i): - scan = " {target} {default}".format(target=target, default=scan_type) - scan_type_command = self.default_command() + scan + def scan_command( # type: ignore[override] + self, + scan_type: str, + target: str, + args: typing.Optional[str] = None, + timeout: typing.Optional[float] = None, + ) -> ET.Element: + """ + Perform host discovery scan using the specified scan type - if (args): - scan_type_command += " {0}".format(args) + :param scan_type: The type of scan to perform (e.g., "-Pn", "-sn", etc.) + :param target: The target to scan (IP address or domain) + :param args: Additional arguments for the scan + :param timeout: Timeout for the scan command in seconds + """ + if scan_type not in self.scan_types: + raise ValueError( + f"Invalid scan type: {scan_type}. Valid types are: {self.scan_types}" + ) - scan_shlex = shlex.split(scan_type_command) + scan = " {target} {default}".format(target=target, default=scan_type) + scan_type_command = self.default_command() + scan - # Use the ping scan parser - output = self.run_command(scan_shlex, timeout=timeout) - xml_root = self.get_xml_et(output) + if args: + scan_type_command += " {0}".format(args) - return xml_root - raise Exception("Something went wrong") + scan_shlex = shlex.split(scan_type_command) + output = self.run_command(scan_shlex, timeout=timeout) + xml_root = self.get_xml_et(output) + return xml_root - def nmap_portscan_only(self, target, args=None): + def nmap_portscan_only( + self, target: str, args: typing.Optional[str] = None + ) -> typing.Dict[str, typing.Any]: """ Scan target using the nmap tcp connect - @cmd nmap -Pn 192.168.178.1 + :param target: can be IP or domain. + :param args: additional arguments for the scan + :return: A dictionary of open ports found on the target + + Example command line usage: + ``` + nmap -Pn 192.168.178.1 + ``` """ xml_root = self.scan_command(self.port_scan_only, target=target, args=args) results = self.parser.filter_top_ports(xml_root) return results - def nmap_no_portscan(self, target, args=None): + def nmap_no_portscan( + self, target: str, args: typing.Optional[str] = None + ) -> typing.Dict[str, typing.Any]: """ Scan target using the nmap tcp connect - @cmd nmap -sn 192.168.178.1 + :param target: can be IP or domain. + :param args: additional arguments for the scan + :return: A dictionary of open ports found on the target + + Example command line usage: + ``` + nmap -sn 192.168.178.1 + ``` """ - if (args): - assert (isinstance(args, str)), "Expected string got {0} instead".format(type(args)) + if args: + assert isinstance(args, str), "Expected string got {0} instead".format( + type(args) + ) xml_root = self.scan_command(self.no_port_scan, target=target, args=args) results = self.parser.filter_top_ports(xml_root) return results - def nmap_arp_discovery(self, target, args=None): + def nmap_arp_discovery( + self, target: str, args: typing.Optional[str] = None + ) -> typing.Dict[str, typing.Any]: """ Scan target using the nmap tcp connect - @cmd nmap -PR 192.168.178.1 + + :param target: can be IP or domain. + :param args: additional arguments for the scan + :return: A dictionary of open ports found on the target + + Example command line usage: + ``` + nmap -PR 192.168.178.1 + ``` """ - if (args): - assert (isinstance(args, str)), "Expected string got {0} instead".format(type(args)) + if args: + assert isinstance(args, str), "Expected string got {0} instead".format( + type(args) + ) xml_root = self.scan_command(self.arp_discovery, target=target, args=args) results = self.parser.filter_top_ports(xml_root) return results - def nmap_disable_dns(self, target, args=None): + def nmap_disable_dns( + self, target: str, args: typing.Optional[str] = None + ) -> typing.Dict[str, typing.Any]: """ Scan target using the nmap tcp connect - @cmd nmap -n 192.168.178.1 + + :param target: can be IP or domain. + :param args: additional arguments for the scan + :return: A dictionary of open ports found on the target + + Example command line usage: + ``` + nmap -n 192.168.178.1 + ``` """ - if (args): - assert (isinstance(args, str)), "Expected string got {0} instead".format(type(args)) + if args: + assert isinstance(args, str), "Expected string got {0} instead".format( + type(args) + ) xml_root = self.scan_command(self.disable_dns, target=target, args=args) results = self.parser.filter_top_ports(xml_root) return results -class NmapAsync(Nmap): - def __init__(self, path:str=''): + +class NmapAsync(BaseNmap): + """Implements an interface to allows the use of nmap port scanner tool from within python using asyncio""" + + def __init__(self, path: typing.Optional[str] = None) -> None: super(NmapAsync, self).__init__(path=path) self.stdout = asyncio.subprocess.PIPE self.stderr = asyncio.subprocess.PIPE - - async def run_command(self, cmd, timeout=None): + + async def run_command( + self, + cmd: typing.Union[str, typing.List[str]], + timeout: typing.Optional[float] = None, + ) -> str: + """ + Runs the nmap command using asyncio subprocess + + :param cmd: the command we want run, as a string or list + :param timeout: command subprocess timeout in seconds. + :return: The output of the command as a string + """ + if isinstance(cmd, list): + cmd = " ".join(cmd) + + # There is possibility of shell injection vulnerabilities due to shell expansion. + # Especially with unsanitized input or user-provided commands. + # But, the full shell functionality is needed here, + # so using `create_subprocess_exec` (safer) is not possible. process = await asyncio.create_subprocess_shell( - cmd, - stdout=self.stdout, - stderr=self.stderr - ) - + cmd, stdout=self.stdout, stderr=self.stderr + ) + logger.debug(f"Created subprocess with PID {process.pid!r}") try: data, stderr = await process.communicate() + except asyncio.TimeoutError: + await _terminate_asyncio_process(process, timeout=0.2) + raise NmapTimeoutError( + 'Command timed out after {timeout} seconds: "'.format(timeout=timeout) + + cmd + + '"' + ) + except asyncio.CancelledError: + await _terminate_asyncio_process(process, timeout=0.2) + raise # Re-propagate the CancelledError except Exception as e: - raise (e) + await _terminate_asyncio_process(process, timeout=0.2) + raise NmapExecutionError( + 'Error during command: "' + cmd + '"\n\n' + str(e) + ) from e else: - if 0 != process.returncode: + if process.returncode != 0: raise NmapExecutionError( - 'Error during command: "' + ' '.join(cmd) + '"\n\n' + \ - stderr.decode('utf8') - ) + 'Error during command: "' + cmd + '"\n\n' + stderr.decode("utf-8") + ) # Response is bytes so decode the output and return - return data.decode('utf8').strip() - - async def scan_command(self, target, arg, args=None, timeout=None): + return data.decode("utf-8").strip() + + async def scan_command( + self, + target: str, + arg: str, + args: typing.Optional[str] = None, + timeout: typing.Optional[float] = None, + ) -> ET.Element: + """ + Perform nmap scan using the specified scan type + + :param scan_type: The type of scan to perform (e.g., "-sS", "-sT", etc.) + :param target: The target to scan (IP address or domain) + :param args: Additional arguments for the scan + :param timeout: Timeout for the scan command in seconds + """ self.target = target command_args = "{target} {default}".format(target=target, default=arg) scancommand = self.default_command() + command_args - if (args): + if args: scancommand += " {0}".format(args) output = await self.run_command(scancommand, timeout=timeout) xml_root = self.get_xml_et(output) - return xml_root - - async def scan_top_ports(self, target, default=10, args=None, timeout=None): - top_port_args = " {target} --top-ports {default}".format(target=target, default=default) + + async def scan_top_ports( + self, + target: str, + default: int = 10, + args: typing.Optional[str] = None, + timeout: typing.Optional[float] = None, + ) -> typing.Dict[str, typing.Any]: + """ + Perform nmap's top ports scan + + :param target: can be IP or domain + :param default: is the default top port + + This top port requires root previledges + """ + if default > self.maxport: + raise ValueError("Port can not be greater than default 65535") + self.target = target + + if args: + assert isinstance(args, str), "Expected string got {0} instead".format( + type(args) + ) + + top_port_args = " {target} --top-ports {default}".format( + target=target, default=default + ) command = self.default_command() + top_port_args - if (args): + if args: command += " {0}".format(args) output = await self.run_command(command, timeout=timeout) if not output: + # Probaby and error was raise raise ValueError("Unable to perform requested command") - self.top_ports = self.parser.filter_top_ports(self.get_xml_et(output)) + # Begin parsing the xml response + xml_root = self.get_xml_et(output) + self.top_ports = self.parser.filter_top_ports(xml_root) return self.top_ports - - async def nmap_dns_brute_script(self, target, dns_brute="--script dns-brute.nse", args=None, timeout=None): + + async def nmap_dns_brute_script( + self, + target: str, + dns_brute: str = "--script dns-brute.nse", + args: typing.Optional[str] = None, + timeout: typing.Optional[float] = None, + ) -> typing.List[typing.Dict[str, typing.Any]]: + """ + Perform nmap scan using the dns-brute script + + :param target: can be IP or domain. + :param dns_brute: the dns-brute script to use + :param args: additional arguments for the scan + :param timeout: timeout for the scan command + :return: List of subdomains found by the dns-brute script + + Example usage: + ```python + from nmap3 import NmapScanTechniquesAsync + + nmap = NmapScanTechniquesAsync() + target = "nmmapper.com" + dns_brute = "--script dns-brute.nse" + args = "--script-args dns-brute.timeout=5" + timeout = 10 # seconds + subdomains = await nmap.nmap_dns_brute_script(target, dns_brute, args, timeout) + print(subdomains) + ``` + """ self.target = target dns_brute_args = "{target} {default}".format(target=target, default=dns_brute) dns_brute_command = self.default_command() + dns_brute_args - + if args: dns_brute_command += " {0}".format(args) - + # Run the command and get the output output = await self.run_command(dns_brute_command, timeout=timeout) - subdomains = self.parser.filter_subdomains(self.get_xml_et(output)) + + # Begin parsing the xml response + xml_root = self.get_xml_et(output) + subdomains = self.parser.filter_subdomains(xml_root) return subdomains - async def nmap_version_detection(self, target, arg="-sV", args=None, timeout=None): - xml_root = await self.scan_command(target=target, arg=arg, timeout=timeout) + async def nmap_version_detection( + self, + target: str, + arg: str = "-sV", + args: typing.Optional[str] = None, + timeout: typing.Optional[float] = None, + ) -> typing.Dict[str, typing.Any]: + """ + Perform nmap scan using the dns-brute script + + :param target: can be IP or domain. + :param arg: nmap argument for version detection, default is "-sV" + :param args: additional arguments for the scan + :param timeout: timeout for the scan command + :return: List of services and their versions found on the target + + Example command line usage: + ``` + nmap -oX - nmmapper.com --script dns-brute.nse + ``` + + Example usage: + ```python + from nmap3 import NmapScanTechniquesAsync + nmap = NmapScanTechniquesAsync() + + target = "nmmapper.com" + arg = "-sV" + args = "--script-args dns-brute.timeout=5" + timeout = 10 # seconds + services = await nmap.nmap_version_detection(target, arg, args, timeout) + print(services) + ``` + """ + xml_root = await self.scan_command( + target=target, arg=arg, args=args, timeout=timeout + ) services = self.parser.filter_top_ports(xml_root) return services - async def nmap_stealth_scan(self, target, arg="-Pn -sZ", args=None): + # Using of basic options for stealth scan + @user_is_root + async def nmap_stealth_scan( + self, target: str, arg: str = "-Pn -sZ", args: typing.Optional[str] = None + ) -> typing.Dict[str, typing.Any]: + """ + Perform nmap's stealth scan on the target + + NOTE: This ``nmap`` scan command requires root/administrator privileges + + :param target: can be IP or domain. + :param arg: nmap argument for stealth scan, default is "-Pn -sZ" + :param args: additional arguments for the scan + :return: List of top ports found on the target + + Example command line usage: + ``` + nmap -oX - nmmapper.com -Pn -sZ + ``` + """ xml_root = await self.scan_command(target=target, arg=arg, args=args) self.top_ports = self.parser.filter_top_ports(xml_root) return self.top_ports - async def nmap_os_detection(self, target, arg="-O", args=None): # requires root + @user_is_root + async def nmap_os_detection( + self, target: str, arg: str = "-O", args: typing.Optional[str] = None + ) -> typing.Dict[str, typing.Any]: # requires root + """ + Perform nmap's os detection on the target + + NOTE: This ``nmap`` scan command requires root/administrator privileges + + :param target: can be IP or domain. + :param arg: nmap argument for os detection, default is "-O" + :param args: additional arguments + + Example command line usage: + ``` + nmap -oX - nmmapper.com -O + ``` + NOTE: Requires root + """ xml_root = await self.scan_command(target=target, arg=arg, args=args) results = self.parser.os_identifier_parser(xml_root) return results - async def nmap_subnet_scan(self, target, arg="-p-", args=None): # requires root + async def nmap_subnet_scan( + self, target: str, arg: str = "-p-", args: typing.Optional[str] = None + ) -> typing.Dict[str, typing.Any]: # may require root + """ + Scan target using nmap's subnet scan + + NOTE: This ``nmap`` scan command may require root/administrator privileges + + :param target: can be IP or domain. + :param arg: nmap argument for subnet scan, default is "-p-" + :param args: additional arguments for the scan + :return: A dictionary of open ports found on the target + + Example command line usage: + ``` + nmap -oX - nmmapper.com -p- + ``` + """ xml_root = await self.scan_command(target=target, arg=arg, args=args) results = self.parser.filter_top_ports(xml_root) return results - async def nmap_list_scan(self, target, arg="-sL", args=None): # requires root + async def nmap_list_scan( + self, target: str, arg: str = "-sL", args: typing.Optional[str] = None + ) -> typing.Dict[str, typing.Any]: + """ + The list scan is a degenerate form of target discovery that simply lists each target of the network(s) + specified, without sending any packets to the target targets. + + NOTE: /usr/bin/nmap -oX - 192.168.178.1/24 -sL + + :param target: can be IP or domain. + :param arg: nmap argument for list scan, default is "-sL" + :param args: additional arguments for the scan + :return: List of targets found in the specified network + """ xml_root = await self.scan_command(target=target, arg=arg, args=args) results = self.parser.filter_top_ports(xml_root) return results -class NmapScanTechniquesAsync(NmapAsync,NmapScanTechniques): - def __init__(self, path:str=''): + +class NmapScanTechniquesAsync(NmapAsync): + """ + Extends `NmapAsync` to include nmap commands + with different scan techniques + + These scan techniques include: + + 1) TCP SYN Scan (-sS) + 2) TCP connect() scan (-sT) + 3) FIN Scan (-sF) + 4) Ping Scan (-sP) + 5) Idle Scan (-sI) + 6) UDP Scan (-sU) + 7) IP Scan (-sO) + """ + + def __init__(self, path: typing.Optional[str] = None): super(NmapScanTechniquesAsync, self).__init__(path=path) + self.sync_scan = "-sS" + self.tcp_connt = "-sT" + self.fin_scan = "-sF" + self.ping_scan = "-sP" + self.idle_scan = "-sL" self.udp_scan = "-sU" - - async def scan_command(self, scan_type, target, args, timeout=None): - def tpl(i): - scan_template = { - 1: self.fin_scan, - 2: self.sync_scan, - 3: self.tcp_connt, - 4: self.ping_scan, - 5: self.idle_scan, - 6: self.udp_scan, - 7: self.ip_scan - } - - return scan_template.get(i) - - for i in range(1, 8): - if scan_type == tpl(i): - scan = " {target} {default}".format(target=target, default=scan_type) - scan_type_command = self.default_command() + scan - - if (args): - scan_type_command += " {0}".format(args) - - output = await self.run_command(scan_type_command, timeout=timeout) - xml_root = self.get_xml_et(output) - - return xml_root - raise Exception("Something went wrong") - - async def nmap_udp_scan(self, target, args=None): - if (args): - assert (isinstance(args, str)), "Expected string got {0} instead".format(type(args)) + self.ip_scan = "-sO" + self.scan_types = { + self.fin_scan, + self.sync_scan, + self.tcp_connt, + self.ping_scan, + self.idle_scan, + self.udp_scan, + self.ip_scan, + } + + async def scan_command( # type: ignore[override] + self, + scan_type: str, + target: str, + args: typing.Optional[str] = None, + timeout: typing.Optional[float] = None, + ) -> ET.Element: + """ + Perform nmap scan using the specified scan type + + :param scan_type: The type of scan to perform (e.g., "-sS", "-sT", etc.) + :param target: The target to scan (IP address or domain) + :param args: Additional arguments for the scan + :param timeout: Timeout for the scan command in seconds + """ + if scan_type not in self.scan_types: + raise ValueError( + f"Invalid scan type: {scan_type}. Valid types are: {self.scan_types}" + ) + + scan = " {target} {default}".format(target=target, default=scan_type) + scan_type_command = self.default_command() + scan + + if args: + scan_type_command += " {0}".format(args) + + output = await self.run_command(scan_type_command, timeout=timeout) + xml_root = self.get_xml_et(output) + return xml_root + + async def nmap_udp_scan( + self, target: str, args: typing.Optional[str] = None + ) -> typing.Dict[str, typing.Any]: + """ + Scan target using the nmap tcp connect + + :param target: can be IP or domain. + :param args: additional arguments for the scan + :return: A dictionary of open ports found on the target + + Example command line usage: + ``` + nmap -sU 192.168.178.1 + ``` + """ + if args: + assert isinstance(args, str), "Expected string got {0} instead".format( + type(args) + ) xml_root = await self.scan_command(self.udp_scan, target=target, args=args) results = self.parser.filter_top_ports(xml_root) return results - + + @user_is_root + async def nmap_fin_scan( + self, target: str, args: typing.Optional[str] = None + ) -> typing.Dict[str, typing.Any]: + """ + Perform scan using nmap's fin scan + + NOTE: This ``nmap`` scan command requires root/administrator privileges + + :param target: can be IP or domain. + :param args: additional arguments for the scan + :return: A dictionary of open ports found on the target + + Example command line usage: + ``` + nmap -sF 192.168.178.1 + ``` + """ + xml_root = await self.scan_command(self.fin_scan, target=target, args=args) + results = self.parser.filter_top_ports(xml_root) + return results + + @user_is_root + async def nmap_syn_scan( + self, target: str, args: typing.Optional[str] = None + ) -> typing.Dict[str, typing.Any]: + """ + Perform syn scan on this given target + + NOTE: This ``nmap`` scan command requires root/administrator privileges + + :param target: can be IP or domain. + :param args: additional arguments for the scan + :return: A dictionary of open ports found on the target + + Example command line usage: + ``` + nmap -sS 192.168.178.1 + ``` + """ + xml_root = await self.scan_command(self.sync_scan, target=target, args=args) + results = self.parser.filter_top_ports(xml_root) + return results + + async def nmap_tcp_scan( + self, target: str, args: typing.Optional[str] = None + ) -> typing.Dict[str, typing.Any]: + """ + Scan target using the nmap tcp connect + + :param target: can be IP or domain. + :param args: additional arguments for the scan + :return: A dictionary of open ports found on the target + + Example command line usage: + ``` + nmap -sT 192.168.178.1 + ``` + """ + if args: + assert isinstance(args, str), "Expected string got {0} instead".format( + type(args) + ) + xml_root = await self.scan_command(self.tcp_connt, target=target, args=args) + results = self.parser.filter_top_ports(xml_root) + return results + + async def nmap_ping_scan( + self, target: str, args: typing.Optional[str] = None + ) -> typing.Dict[str, typing.Any]: + """ + Scan target using nmap's ping scan + + :param target: can be IP or domain. + :param args: additional arguments for the scan + :return: A dictionary of open ports found on the target + + Example command line usage: + ``` + nmap -sP 192.168.178.1 + ``` + """ + xml_root = await self.scan_command(self.ping_scan, target=target, args=args) + results = self.parser.filter_top_ports(xml_root) + return results + + async def nmap_idle_scan( + self, target: str, args: typing.Optional[str] = None + ) -> typing.Dict[str, typing.Any]: + """ + Scan target using nmap's idle scan + + :param target: can be IP or domain. + :param args: additional arguments for the scan + :return: A dictionary of open ports found on the target + + Example command line usage: + ``` + nmap -sL 192.168.178.1 + ``` + """ + xml_root = await self.scan_command(self.idle_scan, target=target, args=args) + results = self.parser.filter_top_ports(xml_root) + return results + + async def nmap_ip_scan( + self, target: str, args: typing.Optional[str] = None + ) -> typing.Dict[str, typing.Any]: + """ + Scan target using nmap's ip scan + + :param target: can be IP or domain. + :param args: additional arguments for the scan + :return: A dictionary of open ports found on the target + + Example command line usage: + ``` + nmap -sO 192.168.178.1 + ``` + """ + xml_root = await self.scan_command(self.ip_scan, target=target, args=args) + results = self.parser.filter_top_ports(xml_root) + return results + + if __name__ == "__main__": parser = argparse.ArgumentParser(prog="Python3 nmap") - parser.add_argument('-d', '--d', help='Help', required=True) + parser.add_argument("-d", "--target", help="Target IP or domain", required=True) args = parser.parse_args() - + nmap = NmapScanTechniquesAsync() - asyncio.run(nmap.nmap_udp_scan(target='127.0.0.1')) + # asyncio.run() wont work in the lowest python version supported `3.6` + # asyncio.run(nmap.nmap_udp_scan(target="127.0.0.1")) + loop = asyncio.get_event_loop() + try: + result = loop.run_until_complete(nmap.nmap_udp_scan(target=args.target)) + print(result) + except Exception as e: + print(f"An error occurred: {e}") + finally: + loop.close() diff --git a/nmap3/nmapparser.py b/nmap3/nmapparser.py index d842aca..27760ea 100644 --- a/nmap3/nmapparser.py +++ b/nmap3/nmapparser.py @@ -1,292 +1,352 @@ # nmap3.py -# +# # Copyright 2019 Wangolo Joel -# +# # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation; either version 2 of the License, or # (at your option) any later version. -# +# # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. -# +# # You should have received a copy of the GNU General Public License # along with this program; if not, write to the Free Software # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, # MA 02110-1301, USA. -# -# -import csv -import io -import os -import re -import shlex -import subprocess -import sys +# +# +from collections import defaultdict +import typing from xml.etree import ElementTree as ET + class NmapCommandParser(object): """ Object for parsing the xml results - + Each function below will correspond to the parse for each nmap command or option. """ - def __init__(self, xml_et): + + def __init__(self, xml_et: typing.Optional[ET.ElementTree]) -> None: self.xml_et = xml_et self.xml_root = None - - def filter_subdomains(self, xmlroot): + + def filter_subdomains( + self, xmlroot: ET.Element + ) -> typing.List[typing.Dict[str, typing.Optional[str]]]: """ - Given the xmlroot return the all the ports that are open from + Given the xmlroot return the all the ports that are open from that tree + + :param xmlroot: xml root element from nmap xml output + :return: List of subdomains found """ - try: - subdomains_list = [] - - scanned_host = xmlroot.find("host") - if scanned_host is not None: - hostscript = scanned_host.find("hostscript") - - script = None - first_table = None - final_result_table = None - - if hostscript is not None: - script = hostscript.find("script") - - if hostscript is not None: - first_table = script.find("table") - - if first_table is not None: - final_result_table = first_table.findall("table") - - if final_result_table is not None: - for table in final_result_table: - script_results = dict() - elem = table.findall("elem") - - if(len(elem) >= 2): - script_results[elem[0].attrib["key"]] = elem[0].text - script_results[elem[1].attrib["key"]] = elem[1].text - subdomains_list.append(script_results) - - except Exception as e: - raise(e) - else: + subdomains_list: typing.List[typing.Dict[str, typing.Optional[str]]] = [] + scanned_host = xmlroot.find("host") + if scanned_host is None: + return subdomains_list + + script = None + first_table = None + final_result_table = None + hostscript = scanned_host.find("hostscript") + + if hostscript is None: return subdomains_list - - def filter_top_ports(self, xmlroot): + + script = hostscript.find("script") + if script is None: + return subdomains_list + + first_table = script.find("table") + if first_table is None: + return subdomains_list + + final_result_table = first_table.findall("table") + if final_result_table is None: + return subdomains_list + + for table in final_result_table: + script_results = {} + elem = table.findall("elem") + + if len(elem) >= 2: + script_results[elem[0].attrib["key"]] = elem[0].text + script_results[elem[1].attrib["key"]] = elem[1].text + subdomains_list.append(script_results) + + return subdomains_list + + def filter_top_ports(self, xmlroot: ET.Element) -> typing.Dict[str, typing.Any]: """ Given the xmlroot return the all the ports that are open from that tree + + :param xmlroot: xml root element from nmap xml output + :return: List of open ports found """ - try: - port_result_dict = {} - - scanned_host = xmlroot.findall("host") - stats = xmlroot.attrib - - for hosts in scanned_host: - address = hosts.find("address").get("addr") - port_result_dict[address]={} # A little trick to avoid errors - - port_result_dict[address]["osmatch"]=self.parse_os(hosts) - port_result_dict[address]["ports"] = self.parse_ports(hosts) - port_result_dict[address]["hostname"] = self.parse_hostnames(hosts) - port_result_dict[address]["macaddress"] = self.parse_mac_address(hosts) - port_result_dict[address]["state"] = self.get_hostname_state(hosts) - - port_result_dict["runtime"]=self.parse_runtime(xmlroot) - port_result_dict["stats"]=stats - port_result_dict["task_results"]=self.parse_task_results(xmlroot) - - except Exception as e: - raise(e) - else: - return port_result_dict - - def os_identifier_parser(self, xmlroot): + port_result_dict: typing.Dict[str, typing.Any] = defaultdict(dict) + scanned_host = xmlroot.findall("host") + # This ensures we have a copy of the stats and we are not holding reference + # to the xmlroot.attrib, just in case we modify it later + stats = dict(xmlroot.attrib) + + for hosts in scanned_host: + address = hosts.find("address") + if address is None: + continue + addr = address.get("addr") + if addr is None: + continue + + port_result_dict[addr]["osmatch"] = self.parse_os(hosts) + port_result_dict[addr]["ports"] = self.parse_ports(hosts) + port_result_dict[addr]["hostname"] = self.parse_hostnames(hosts) + port_result_dict[addr]["macaddress"] = self.parse_mac_address(hosts) + port_result_dict[addr]["state"] = self.get_hostname_state(hosts) + + port_result_dict["runtime"] = self.parse_runtime(xmlroot) + port_result_dict["stats"] = stats + port_result_dict["task_results"] = self.parse_task_results(xmlroot) + return dict(port_result_dict) + + def os_identifier_parser(self, xmlroot: ET.Element) -> typing.Dict[str, typing.Any]: """ Parser for identified os + + :param xmlroot: xml root element from nmap xml output + :return: Dictionary of os identified """ - try: - os_identified = [] - os_dict = {} - hosts = xmlroot.findall("host") - stats = xmlroot.attrib - - for host in hosts: - address = host.find("address").get("addr") - os_dict[address]={} - - os_dict[address]["osmatch"]=self.parse_os(host) - os_dict[address]["ports"] = self.parse_ports(host) - os_dict[address]["hostname"] = self.parse_hostnames(host) - os_dict[address]["macaddress"] = self.parse_mac_address(host) - - os_dict["runtime"]=self.parse_runtime(xmlroot) - os_dict["stats"]=stats - os_dict["task_results"]=self.parse_task_results(xmlroot) - return os_dict - - except Exception as e: - raise(e) - else: - return os_identified - - def parse_os(self, os_results): + os_dict: typing.Dict[str, typing.Any] = defaultdict(dict) + hosts = xmlroot.findall("host") + stats = dict(xmlroot.attrib) + + for host in hosts: + address = host.find("address") + if address is None: + continue + addr = address.get("addr") + if addr is None: + continue + + os_dict[addr]["osmatch"] = self.parse_os(host) + os_dict[addr]["ports"] = self.parse_ports(host) + os_dict[addr]["hostname"] = self.parse_hostnames(host) + os_dict[addr]["macaddress"] = self.parse_mac_address(host) + + os_dict["runtime"] = self.parse_runtime(xmlroot) + os_dict["stats"] = stats + os_dict["task_results"] = self.parse_task_results(xmlroot) + return dict(os_dict) + + def parse_os( + self, os_results: ET.Element + ) -> typing.List[typing.Dict[str, typing.Any]]: """ - parses os results + Parses os results + + :param os_results: xml root element from nmap xml output + :return: List of os identified """ os = os_results.find("os") - os_list = [] - - if os is not None: - for match in os.findall("osmatch"): - attrib = match.attrib - - for osclass in match.findall("osclass"): - attrib["osclass"]=osclass.attrib - - for cpe in osclass.findall("cpe"): - attrib["cpe"]=cpe.text - os_list.append(attrib) + os_list: typing.List[typing.Dict[str, typing.Any]] = [] + + if os is None: return os_list - else: - return {} - - def parse_ports(self, xml_hosts): + + for match in os.findall("osmatch"): + attrib: typing.Dict[str, typing.Any] = dict(match.attrib) + + for osclass in match.findall("osclass"): + attrib["osclass"] = dict(osclass.attrib) + + for cpe in osclass.findall("cpe"): + attrib["cpe"] = cpe.text + os_list.append(attrib) + return os_list + + def parse_ports( + self, xml_hosts: ET.Element + ) -> typing.List[typing.Dict[str, typing.Any]]: """ - Parse parts from xml + Parse ports from xml + + :param xml_hosts: xml root element from nmap xml output + :return: List of open ports found """ open_ports_list = [] - + for port in xml_hosts.findall("ports/port"): - open_ports = {} - for key in port.attrib: - open_ports[key]=port.attrib.get(key) - - if(port.find('state') is not None): - for key in port.find('state').attrib: - open_ports[key]=port.find("state").attrib.get(key) - - if(port.find('service') is not None): - open_ports["service"]=port.find("service").attrib + open_ports: typing.Dict[str, typing.Any] = {} + open_ports.update(port.attrib) + + state = port.find("state") + if state is not None: + open_ports.update(state.attrib) + + service = port.find("service") + if service is not None: + open_ports["service"] = dict(service.attrib) cpe_list = [] - for cp in port.find("service").findall("cpe"): - + for cp in service.findall("cpe"): cpe_list.append({"cpe": cp.text}) open_ports["cpe"] = cpe_list - + # Script - open_ports["scripts"]=self.parse_scripts(port.findall('script')) if port.findall('script') is not None else [] + open_ports["scripts"] = ( + self.parse_scripts(port.findall("script")) + if port.findall("script") is not None + else [] + ) open_ports_list.append(open_ports) - + return open_ports_list - - def parse_runtime(self, xml): + + def parse_runtime(self, xml: ET.Element) -> typing.Optional[typing.Dict[str, str]]: """ - Parse parts from xml + Parse runtime from xml + + :param xml: xml root element from nmap xml output + :return: Dictionary with runtime attributes """ runstats = xml.find("runstats") - runtime = {} - if runstats is not None: - if runstats.find("finished") is not None: - return runstats.find("finished").attrib + finished = runstats.find("finished") + if finished is not None: + return dict(finished.attrib) + return None - def parse_task_results(self, xml): + def parse_task_results(self, xml: ET.Element) -> typing.List[typing.Dict[str, str]]: """ - Parse parts from xml + Parse task results from xml + + :param xml: xml root element from nmap xml output + :return: List of task results found """ - task_results = xml.findall('taskend') + task_results = xml.findall("taskend") task_results_list = [] for task_result in task_results: - task_results_list.append(task_result.attrib) + task_results_list.append(dict(task_result.attrib)) return task_results_list - - def parse_mac_address(self, xml): + + def parse_mac_address( + self, xml: ET.Element + ) -> typing.Optional[typing.Dict[str, str]]: """ - Parse parts from xml + Parse mac address from xml + + :param xml: xml root element from nmap xml output + :return: Dictionary with mac address attributes or None if not found """ addresses = xml.findall("address") - + for addr in addresses: - if(addr.attrib.get("addrtype") == "mac"): - return addr.attrib - - def parse_hostnames(self, host): + if addr.attrib.get("addrtype") == "mac": + return dict(addr.attrib) + return None + + def parse_hostnames(self, host: ET.Element) -> typing.List[typing.Dict[str, str]]: """ - Parse parts from xml + Parse hostnames from xml + + :param host: xml root element from nmap xml output + :return: List of hostnames found """ hostnames = host.findall("hostnames/hostname") hostnames_list = [] - - for host in hostnames: - hostnames_list.append(host.attrib) + + for hostname in hostnames: + hostnames_list.append(dict(hostname.attrib)) return hostnames_list - - def get_hostname_state(self, xml): + + def get_hostname_state( + self, xml: ET.Element + ) -> typing.Optional[typing.Dict[str, str]]: """ - Parse parts from xml + Parse hostname state from xml + + :param xml: xml root element from nmap xml output + :return: Dictionary with hostname state attributes or None if not found """ state = xml.find("status") - if(state is not None): - return state.attrib - - def parse_scripts(self, scripts_xml): + if state is not None: + return dict(state.attrib) + return None + + def parse_scripts( + self, scripts_xml: typing.List[ET.Element] + ) -> typing.List[typing.Dict[str, typing.Any]]: + """ + Parse scripts from xml + + :param scripts_xml: List of xml elements containing script data + :return: List of dictionaries containing script name, raw output, and data + """ + if not scripts_xml: + return [] scripts = [] for script_xml in scripts_xml: - script_name = script_xml.attrib.get('id') - raw_output = script_xml.attrib.get('output') + script_name = script_xml.attrib.get("id") + raw_output = script_xml.attrib.get("output") data = self.convert_xml_elements(script_xml) - if script_xml.findall('table') is not None: - tables = script_xml.findall('table') + tables = script_xml.findall("table") + if tables is not None: child_data = self.convert_xml_tables(tables) - for k in child_data: - if {} != k: - data[k] = child_data[k] - - scripts.append({ - 'name': script_name, - 'raw': raw_output, - 'data': data - }) + for key, value in child_data.items(): + if key: + data[key] = value + scripts.append({"name": script_name, "raw": raw_output, "data": data}) return scripts - def convert_xml_tables(self, xml_tables): - data = {} + def convert_xml_tables( + self, xml_tables: typing.List[ET.Element] + ) -> typing.Dict[str, typing.Any]: + """ + Convert XML tables to a dictionary format. + + :param xml_tables: List of XML table elements to convert + :return: Dictionary representation of the XML tables + """ + data: typing.Dict[str, typing.Any] = {} for xml_table in xml_tables: - key = xml_table.attrib.get('key') + key = xml_table.attrib.get("key") child_data = self.convert_xml_elements(xml_table) if key is None: - if {} != child_data: - a = data.get('children', []) - data['children'] = a + [child_data] + if child_data: + a = data.get("children", []) + data["children"] = a + [child_data] else: - if xml_table.findall('table') is not None: - data[key] = self.convert_xml_tables(xml_table.findall('table')) - if {} != child_data: + tables = xml_table.findall("table") + if tables is not None: + data[key] = self.convert_xml_tables(tables) + if child_data: a = data.get(key, {}) - b = a.get('children', []) - a['children'] = b + [child_data] - + b = a.get("children", []) + a["children"] = b + [child_data] return data - def convert_xml_elements(self, xml_obj): + def convert_xml_elements( + self, xml_obj: ET.Element + ) -> typing.Dict[str, typing.Optional[str]]: + """ + Convert XML elements to a dictionary format. + + :param xml_obj: XML element to convert + :return: Dictionary representation of the XML elements + """ elements = {} - elem_counter = 0 - for elem in xml_obj.findall('elem'): - if None == elem.attrib.get('key'): - elements[elem_counter] = elem.text + for counter, element in enumerate(xml_obj.findall("element")): + key = element.attrib.get("key") + if key is None: + elements[str(counter)] = element.text else: - elements[elem.attrib.get('key')] = elem.text - elem_counter += 1 + elements[key] = element.text return elements diff --git a/nmap3/utils.py b/nmap3/utils.py index 499de60..e15f2ed 100644 --- a/nmap3/utils.py +++ b/nmap3/utils.py @@ -20,83 +20,204 @@ # import shlex import subprocess -import sys import os -import ctypes +import sys import functools +import logging +import typing +import asyncio +import shutil +import ctypes + +from nmap3.exceptions import ( + NmapExecutionError, + NmapNotInstalledError, + NmapPrivilegeError, +) + +__author__ = "Wangolo Joel (inquiry@nmapper.com)" +__version__ = "1.9.3" +__last_modification__ = "Jul/14/2025" + +logger = logging.getLogger(__name__) -from nmap3.exceptions import NmapNotInstalledError +T = typing.TypeVar("T") +R = typing.TypeVar("R") -__author__ = 'Wangolo Joel (inquiry@nmapper.com)' -__version__ = '1.9.3' -__last_modification__ = 'Jun/06/2025' -def get_nmap_path(path:str='') -> str: +def get_nmap_path(path: typing.Optional[str] = None) -> str: """ - Accepts path, validate it. If not valide, search nmap path + Accepts path, validate it. If not valid, search nmap path Returns the location path where nmap is installed by calling which nmap - If not found raises NmapNotInstalledError + + :param path: Optional path to nmap binary + :return: Path to nmap binary + :raises NmapNotInstalledError: If nmap is not installed or path is not valid """ if path and (os.path.exists(path)): return path - os_type = sys.platform - if os_type == 'win32': - cmd = "where nmap" - else: - cmd = "which nmap" - args = shlex.split(cmd) - sub_proc = subprocess.Popen(args, stdout=subprocess.PIPE) - - output, e = sub_proc.communicate(timeout=15) - if e: - print(e) - + output = shutil.which("nmap") if not output: raise NmapNotInstalledError(path=path) - if os_type == 'win32': - return output.decode('utf8').strip().replace("\\", "/") - return output.decode('utf8').strip() + return output.strip().replace("\\", "/") + -def get_nmap_version(): +def get_nmap_version() -> typing.Optional[str]: + """ + Returns the version of nmap installed on the system + + :return: Version of nmap installed or None if an error occurs + :raises NmapNotInstalledError: If nmap is not installed + """ nmap = get_nmap_path() cmd = nmap + " --version" args = shlex.split(cmd) - sub_proc = subprocess.Popen(args, stdout=subprocess.PIPE) + process = subprocess.Popen(args, stdout=subprocess.PIPE) try: - output, _ = sub_proc.communicate(timeout=15) + output, _ = process.communicate(timeout=15) except Exception as e: - print(e) - sub_proc.kill() - else: - return output.decode('utf8').strip() + logger.error(f"Error while trying to get nmap version: {e}", exc_info=True) + _terminate_process(process, timeout=0.2) + return None + return output.decode("utf8").strip() + -def user_is_root(func): - def wrapper(*args, **kwargs): +PRIVILEGE_DENIED_KEYWORDS = { + "root privileges", + "administrator", + "permission denied", + "operation not permitted", +} + + +if sys.version_info >= (3, 10) or (sys.version_info >= (3, 7) and typing.TYPE_CHECKING): + # For Python 3.10+ or when type checking, use proper ParamSpec typing + try: + from typing import ParamSpec + + P = ParamSpec("P") # type: ignore[no-redef] + + @typing.overload # type: ignore[no-overload-impl] + def requires_root_privilege( + func: typing.Callable[P, typing.Awaitable[T]], + ) -> typing.Callable[P, typing.Awaitable[T]]: ... + + @typing.overload + def requires_root_privilege( + func: typing.Callable[P, T], + ) -> typing.Callable[P, T]: ... + + except ImportError: + # Fallback if ParamSpec is not available try: - is_root_or_admin = (os.getuid() == 0) - except AttributeError: - is_root_or_admin = ctypes.windll.shell32.IsUserAnAdmin() != 0 - - if(is_root_or_admin): - return func(*args, **kwargs) - else: - return {"error":True, "msg":"You must be root/administrator to continue!"} - return wrapper - -def nmap_is_installed_async(): - def wrapper(func): + from typing_extensions import ParamSpec # type: ignore[assignment] + + P = ParamSpec("P") # type: ignore[no-redef] + + @typing.overload # type: ignore[no-overload-impl,no-redef] + def requires_root_privilege( # type: ignore[no-redef] + func: typing.Callable[P, typing.Awaitable[T]], + ) -> typing.Callable[P, typing.Awaitable[T]]: ... + + @typing.overload + def requires_root_privilege( # type: ignore[no-redef] + func: typing.Callable[P, T], + ) -> typing.Callable[P, T]: ... + + except ImportError: + # No overloads for very old Python versions + pass + + +if os.name == "nt": + def _is_windows_admin() -> bool: + return ctypes.windll.shell32.IsUserAnAdmin() != 0 # type: ignore[attr-defined] + + +def requires_root_privilege( # type: ignore[no-redef] + func: typing.Callable[..., typing.Any], +) -> typing.Callable[..., typing.Any]: + """ + Decorator that marks a function as requiring root privileges. + + If the function is called without root privileges, it catches `NmapExecutionError` + and raises `NmapPrivilegeError` with a message indicating insufficient privilege. + + Works with both sync and async functions. + """ + if asyncio.iscoroutinefunction(func): + @functools.wraps(func) - async def wrapped(*args, **kwargs): - nmap_path = get_nmap_path() - - if(os.path.exists(nmap_path)): + async def async_wrapper(*args: typing.Any, **kwargs: typing.Any) -> typing.Any: + if os.name == "nt" and not _is_windows_admin(): + raise NmapPrivilegeError( + "You must be root/administrator to continue!" + ) + # Proceed for other platforms as the user may already + # be running the program with 'sudo' or 'doas' privileges + try: return await func(*args, **kwargs) - else: - print({"error":True, "msg":"Nmap has not been install on this system yet!"}) - return {"error":True, "msg":"Nmap has not been install on this system yet!"} - return wrapped - return wrapper + except NmapExecutionError as e: + msg = str(e).lower() + if any(keyword in msg for keyword in PRIVILEGE_DENIED_KEYWORDS): + raise NmapPrivilegeError( + "You must be root/administrator to continue!" + ) from e + raise e + + return async_wrapper + + else: + + @functools.wraps(func) + def sync_wrapper(*args: typing.Any, **kwargs: typing.Any) -> typing.Any: + if os.name == "nt" and not _is_windows_admin(): + raise NmapPrivilegeError( + "You must be root/administrator to continue!" + ) + + # Proceed for other platforms + try: + return func(*args, **kwargs) + except NmapExecutionError as e: + msg = str(e).lower() + if any(keyword in msg for keyword in PRIVILEGE_DENIED_KEYWORDS): + raise NmapPrivilegeError( + "You must be root/administrator to continue!" + ) from e + raise e + + return sync_wrapper + + +user_is_root = requires_root_privilege # Alias for backward compatibility + + +def _terminate_process(process: subprocess.Popen, timeout: float = 0.5) -> None: + """Terminate a (sub) process gracefully""" + try: + process.terminate() + process.wait( + timeout=timeout + ) # Wait to reap the process and avoid 'zombie' state + except subprocess.TimeoutExpired: + logger.warning(f"Process {process.pid!r} did not terminate gracefully") + process.kill() + + +async def _terminate_asyncio_process( + process: asyncio.subprocess.Process, timeout: float = 0.5 +) -> None: + """Terminate a asyncio (sub) process gracefully""" + try: + process.terminate() + await asyncio.wait_for( + process.wait(), timeout=1.0 + ) # Wait to reap the process and avoid 'zombie' state + except asyncio.TimeoutError: + logger.warning(f"Process {process.pid!r} did not terminate gracefully") + process.kill() diff --git a/setup.py b/setup.py index ea792b5..1769c24 100644 --- a/setup.py +++ b/setup.py @@ -4,7 +4,7 @@ long_description = fh.read() setuptools.setup( - name="python3-nmap", + name="python3-nmap", version="1.9.3", author="nmmapper", author_email="info@nmmapper.com", @@ -13,19 +13,19 @@ long_description_content_type="text/markdown", url="https://github.com/nmmapper/python3-nmap", project_urls={ - 'Documentation': 'https://nmap.readthedocs.io/en/latest/', - 'How it is used': 'https://www.nmmapper.com/sys/networkmapper/nmap/online-port-scanning/', - 'Homepage': 'https://www.nmmapper.com/', - 'Source': 'https://github.com/nmmapper/python3-nmap', - 'Subdomain finder': 'https://www.nmmapper.com/sys/tools/subdomainfinder/', - 'theHarvester online': 'https://www.nmmapper.com/sys/theharvester/email-harvester-tool/online/', + "Documentation": "https://nmap.readthedocs.io/en/latest/", + "How it is used": "https://www.nmmapper.com/sys/networkmapper/nmap/online-port-scanning/", + "Homepage": "https://www.nmmapper.com/", + "Source": "https://github.com/nmmapper/python3-nmap", + "Subdomain finder": "https://www.nmmapper.com/sys/tools/subdomainfinder/", + "theHarvester online": "https://www.nmmapper.com/sys/theharvester/email-harvester-tool/online/", }, packages=setuptools.find_packages(), classifiers=[ "Programming Language :: Python :: 3", "Operating System :: OS Independent", ], - python_requires='>=3.6', - setup_requires=['wheel'], - install_requires=['simplejson'], + python_requires=">=3.6", + setup_requires=["wheel"], + install_requires=["simplejson"], )