From 0f308da4d0d51d2a9c5faf7b6a56500f7710ec00 Mon Sep 17 00:00:00 2001 From: bilditup1 Date: Thu, 16 Nov 2023 21:44:57 -0500 Subject: [PATCH 01/10] Support for >260 char filenames on Windows --- host/nxdt_host.py | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/host/nxdt_host.py b/host/nxdt_host.py index 2287e442..11c6dc5d 100644 --- a/host/nxdt_host.py +++ b/host/nxdt_host.py @@ -787,6 +787,10 @@ def usbHandleSendFileProperties(cmd_block: bytes) -> int | None: # Generate full, absolute path to the destination file. fullpath = os.path.abspath(g_outputDir + os.path.sep + filename) + # Unconditionally enable 32-bit paths on Windows. + if g_isWindows: + fullpath = '\\\\?\\' + fullpath.replace("/", "\\") + # Get parent directory path. dirpath = os.path.dirname(fullpath) @@ -822,6 +826,11 @@ def usbHandleSendFileProperties(cmd_block: bytes) -> int | None: # Retrieve what we need using global variables. file = g_nspFile fullpath = g_nspFilePath + + # Unconditionally enable 32-bit paths on Windows. + if g_isWindows: + fullpath = '\\\\?\\' + fullpath.replace("/", "\\") + dirpath = os.path.dirname(fullpath) # Check if we're dealing with an empty file or with the first SendFileProperties command from a NSP. From 13a9f86fe93b453724e75eae0c6e4427aa31c4af Mon Sep 17 00:00:00 2001 From: bilditup1 Date: Mon, 20 Nov 2023 21:29:14 -0500 Subject: [PATCH 02/10] Added logging to file (gui) --- host/nxdt_host.py | 124 ++++++++++++++++++++++++++++++++-------------- 1 file changed, 87 insertions(+), 37 deletions(-) diff --git a/host/nxdt_host.py b/host/nxdt_host.py index 11c6dc5d..579602a7 100644 --- a/host/nxdt_host.py +++ b/host/nxdt_host.py @@ -60,6 +60,9 @@ from io import BufferedWriter from typing import List, Tuple, Any, Callable, Optional +from datetime import datetime + + # Scaling factors. WINDOWS_SCALING_FACTOR = 96.0 SCALE = 1.0 @@ -303,8 +306,10 @@ # Global variables used throughout the code. g_cliMode: bool = False g_outputDir: str = '' -g_logLevelIntVar: Optional[tk.IntVar] = None +g_logPath: str = '' +g_logLevelIntVar: Optional[tk.IntVar] = None +g_logToFileBoolVar: Optional[tk.BooleanVar] = None g_osType: str = '' g_osVersion: str = '' @@ -357,7 +362,12 @@ def emit(self, record: logging.LogRecord) -> None: msg = self.format(record) print(msg) else: - self.log_queue.put(record) + self.log_queue.put(record) + if g_logToFileBoolVar.get(): + logToFileMsg=datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]+'\t['+ \ + record.levelname+']\t'+self.format(record).strip()+'\n' + with open (g_logPath, 'a', encoding="utf-8") as f: + f.write(logToFileMsg) # Reference: https://beenje.github.io/blog/posts/logging-to-a-tkinter-scrolledtext-widget. class LogConsole: @@ -388,7 +398,7 @@ def display(self, record: logging.LogRecord) -> None: self.scrolled_text.insert(tk.END, msg + '\n', record.levelname) self.scrolled_text.configure(state='disabled') self.scrolled_text.yview(tk.END) - + def poll_log_queue(self) -> None: # Check every 100 ms if there is a new message in the queue to display. while True: @@ -566,7 +576,45 @@ def utilsGetPath(path_arg: str, fallback_path: str, is_file: bool, create: bool raise Exception(f'Error: "{path}" points to an invalid file/directory.') return path + +# Prepends `\\?\` to enable ~64KiB long paths in Windows. +# ref0: https://learn.microsoft.com/en-us/windows/win32/fileio/naming-a-file?redirectedfrom=MSDN#win32-file-namespaces +# ref1: https://learn.microsoft.com/en-us/windows/win32/fileio/maximum-file-path-limitation?tabs=registry +# ref2: https://stackoverflow.com/a/15373771 +# Replaces '/' separator with proper '\' in case running under MSYS2 env. + +def utilsGetWinFullPath(path_arg: str) -> str: + return '\\\\?\\' + path_arg.replace("/", "\\") + +def utilsResetOutputDir() -> None: + global g_outputDir + global g_logPath + #assert g_tkDirText is not None + g_outputDir = g_tkDirText.get('1.0', tk.END).strip() + if not g_outputDir: + # We should never reach this, honestly. + messagebox.showerror('Error', 'You must provide an output directory!', parent=g_tkRoot) + return + + if g_logToFileBoolVar.get(): + g_logPath = os.path.abspath(g_outputDir + os.path.sep + \ + "nxdt_host_" + datetime.now().strftime('%Y-%m-%d_%H%M%S') + '.log') + if g_isWindows: + g_logPath = utilsGetWinFullPath(g_logPath) + print(g_logPath) + else: + print("Not logging to file.") + + # Make sure the full directory tree exists. + try: + os.makedirs(g_outputDir, exist_ok=True) + except: + utilsLogException(traceback.format_exc()) + messagebox.showerror('Error', 'Unable to create full output directory tree!', parent=g_tkRoot) + return + + return def utilsIsValueAlignedToEndpointPacketSize(value: int) -> bool: return bool((value & (g_usbEpMaxPacketSize - 1)) == 0) @@ -787,9 +835,9 @@ def usbHandleSendFileProperties(cmd_block: bytes) -> int | None: # Generate full, absolute path to the destination file. fullpath = os.path.abspath(g_outputDir + os.path.sep + filename) - # Unconditionally enable 32-bit paths on Windows. + # Unconditionally enable long paths in Windows. if g_isWindows: - fullpath = '\\\\?\\' + fullpath.replace("/", "\\") + fullpath = utilsGetWinFullPath(fullpath); # Get parent directory path. dirpath = os.path.dirname(fullpath) @@ -827,10 +875,6 @@ def usbHandleSendFileProperties(cmd_block: bytes) -> int | None: file = g_nspFile fullpath = g_nspFilePath - # Unconditionally enable 32-bit paths on Windows. - if g_isWindows: - fullpath = '\\\\?\\' + fullpath.replace("/", "\\") - dirpath = os.path.dirname(fullpath) # Check if we're dealing with an empty file or with the first SendFileProperties command from a NSP. @@ -1125,29 +1169,26 @@ def uiStopServer() -> None: # Signal the shared stop event. #assert g_stopEvent is not None g_stopEvent.set() + # Log the end of the session. + g_logger.info("\nServer stopped.\n") + def uiStartServer() -> None: - global g_outputDir - #assert g_tkDirText is not None - - g_outputDir = g_tkDirText.get('1.0', tk.END).strip() - if not g_outputDir: - # We should never reach this, honestly. - messagebox.showerror('Error', 'You must provide an output directory!', parent=g_tkRoot) - return - - # Make sure the full directory tree exists. - try: - os.makedirs(g_outputDir, exist_ok=True) - except: - utilsLogException(traceback.format_exc()) - messagebox.showerror('Error', 'Unable to create full output directory tree!', parent=g_tkRoot) - return + # Reset output path variable based on possible user input unconditionally. + utilsResetOutputDir() # Update UI. uiToggleElements(False) + # Log basic session status and info. + g_logger.info('\nServer started.') + g_logger.info('Output Dir:\t' + g_outputDir) + if g_logToFileBoolVar.get(): + g_logger.info('Logging to:\t' + g_logPath.rsplit('\\', 1)[-1] + '\n') + else: + g_logger.info('Logging to file disabled.\n') + # Create background server thread. server_thread = threading.Thread(target=usbCommandHandler, daemon=True) server_thread.start() @@ -1157,6 +1198,7 @@ def uiToggleElements(flag: bool) -> None: #assert g_tkChooseDirButton is not None #assert g_tkServerButton is not None #assert g_tkCanvas is not None + #assert g_tkLogToFileCheckbox is not None #assert g_tkVerboseCheckbox is not None if flag: @@ -1165,7 +1207,7 @@ def uiToggleElements(flag: bool) -> None: g_tkChooseDirButton.configure(state='normal') g_tkServerButton.configure(text='Start server', command=uiStartServer, state='normal') g_tkCanvas.itemconfigure(g_tkTipMessage, state='hidden', text='') - + g_tkLogToFileCheckbox.configure(state='normal') g_tkVerboseCheckbox.configure(state='normal') else: #assert g_tkScrolledTextLog is not None @@ -1179,7 +1221,7 @@ def uiToggleElements(flag: bool) -> None: g_tkScrolledTextLog.configure(state='normal') g_tkScrolledTextLog.delete('1.0', tk.END) g_tkScrolledTextLog.configure(state='disabled') - + g_tkLogToFileCheckbox.configure(state='disabled') g_tkVerboseCheckbox.configure(state='disabled') def uiChooseDirectory() -> None: @@ -1210,15 +1252,15 @@ def uiHandleVerboseCheckbox() -> None: g_logger.setLevel(g_logLevelIntVar.get()) def uiInitialize() -> None: - global SCALE, g_logLevelIntVar - global g_tkRoot, g_tkCanvas, g_tkDirText, g_tkChooseDirButton, g_tkServerButton, g_tkTipMessage, g_tkScrolledTextLog, g_tkVerboseCheckbox + global SCALE, g_logLevelIntVar, g_logToFileBoolVar + global g_tkRoot, g_tkCanvas, g_tkDirText, g_tkChooseDirButton, g_tkServerButton, g_tkTipMessage, g_tkScrolledTextLog, g_tkLogToFileCheckbox, g_tkVerboseCheckbox global g_stopEvent, g_tlb, g_taskbar, g_progressBarWindow # Setup thread event. g_stopEvent = threading.Event() # Enable high DPI scaling under Windows (if possible). - # This will remove the blur caused by bilineal filtering when automatic scaling is carried out by Windows itself. + # This will remove the blur caused by bilinear filtering when automatic scaling is carried out by Windows itself. dpi_aware = False if g_isWindowsVista: try: @@ -1299,7 +1341,7 @@ def uiInitialize() -> None: g_tkCanvas = tk.Canvas(g_tkRoot, width=window_width_px, height=window_height_px) g_tkCanvas.pack() - g_tkCanvas.create_text(uiScaleMeasure(60), uiScaleMeasure(30), text='Output directory:', anchor=tk.CENTER) + g_tkCanvas.create_text(uiScaleMeasure(60), uiScaleMeasure(30), text='Output Dir:', anchor=tk.CENTER) g_tkDirText = tk.Text(g_tkRoot, height=1, width=45, font=default_font, wrap='none', state='disabled', bg='#F0F0F0') uiUpdateDirectoryField(g_outputDir) @@ -1322,15 +1364,22 @@ def uiInitialize() -> None: g_tkScrolledTextLog.tag_config('CRITICAL', foreground='red', underline=True) g_tkCanvas.create_window(uiScaleMeasure(int(WINDOW_WIDTH / 2)), uiScaleMeasure(280), window=g_tkScrolledTextLog, anchor=tk.CENTER) - g_tkCanvas.create_text(uiScaleMeasure(5), uiScaleMeasure(WINDOW_HEIGHT - 10), text=COPYRIGHT_TEXT, anchor=tk.W) + g_tkCanvas.create_text(uiScaleMeasure(5), uiScaleMeasure(WINDOW_HEIGHT - 13), text=COPYRIGHT_TEXT, anchor=tk.W) + + g_logToFileBoolVar = tk.BooleanVar() + g_tkLogToFileCheckbox = tk.Checkbutton(g_tkRoot, text='Log to file', variable=g_logToFileBoolVar,onvalue=True, offvalue=False) + g_tkLogToFileCheckbox.select() + + g_tkCanvas.create_window(uiScaleMeasure(WINDOW_WIDTH - 165), uiScaleMeasure(WINDOW_HEIGHT - 13), window=g_tkLogToFileCheckbox, anchor=tk.CENTER) g_logLevelIntVar = tk.IntVar() g_tkVerboseCheckbox = tk.Checkbutton(g_tkRoot, text='Verbose output', variable=g_logLevelIntVar, onvalue=logging.DEBUG, offvalue=logging.INFO, command=uiHandleVerboseCheckbox) - g_tkCanvas.create_window(uiScaleMeasure(WINDOW_WIDTH - 55), uiScaleMeasure(WINDOW_HEIGHT - 10), window=g_tkVerboseCheckbox, anchor=tk.CENTER) + + g_tkCanvas.create_window(uiScaleMeasure(WINDOW_WIDTH - 55), uiScaleMeasure(WINDOW_HEIGHT - 13), window=g_tkVerboseCheckbox, anchor=tk.CENTER) # Initialize console logger. console = LogConsole(g_tkScrolledTextLog) - + # Initialize progress bar window object. bar_format = '{desc}\n\n{percentage:.2f}% - {n:.2f} / {total:.2f} {unit}\nElapsed time: {elapsed}. Remaining time: {remaining}.\nSpeed: {rate_fmt}.' g_progressBarWindow = ProgressBarWindow(bar_format, g_tkRoot, 'File transfer', False, uiHandleExitProtocolStub) @@ -1341,7 +1390,7 @@ def uiInitialize() -> None: def cliInitialize() -> None: global g_progressBarWindow - + #assert g_logger is not None # Initialize console logger. @@ -1353,7 +1402,7 @@ def cliInitialize() -> None: # Print info. g_logger.info('\n' + SCRIPT_TITLE + '. ' + COPYRIGHT_TEXT + '.') - g_logger.info('Output directory: "' + g_outputDir + '".\n') + g_logger.info('Output Dir: "' + g_outputDir + '".\n') # Start USB command handler directly. usbCommandHandler() @@ -1414,7 +1463,8 @@ def main() -> int: ret = main() except KeyboardInterrupt: time.sleep(0.2) - print('\nScript interrupted.') + print("\nHost script interrupted!") + except Exception as e: utilsLogException(traceback.format_exc()) From 4e6f91eaa5f3715054f01f5245f4b5de0715d7b1 Mon Sep 17 00:00:00 2001 From: bilditup1 Date: Sun, 26 Nov 2023 15:09:29 -0500 Subject: [PATCH 03/10] cli logging w/time+color, refactoring, remember last dir --- host/nxdt_host.py | 472 +++++++++++++++++++++++++++++++++------------- 1 file changed, 343 insertions(+), 129 deletions(-) diff --git a/host/nxdt_host.py b/host/nxdt_host.py index 579602a7..96a96406 100644 --- a/host/nxdt_host.py +++ b/host/nxdt_host.py @@ -55,13 +55,22 @@ from tqdm import tqdm -from argparse import ArgumentParser +from argparse import ArgumentParser, RawTextHelpFormatter, ArgumentDefaultsHelpFormatter from io import BufferedWriter from typing import List, Tuple, Any, Callable, Optional from datetime import datetime +# Terminal colors using ANSI escape sequences. +# ref: https://gist.github.com/fnky/458719343aabd01cfb17a3a4f7296797 +COLOR_BACKGROUND = "\033[40m\033[0K" # black +COLOR_DEBUG = "\033[39m" # vanilla +COLOR_INFO = "\033[38;5;255m" # bright white +COLOR_WARNING = "\033[38;5;202m" # orange +COLOR_ERROR = "\033[1;31m" # red (intense) +COLOR_CRITICAL = "\033[4;31m" # underlined red +COLOR_RESET = "\033[0m\033[0K"; # resets all colors; blanks line from cursor pos # Scaling factors. WINDOWS_SCALING_FACTOR = 96.0 @@ -305,8 +314,12 @@ # Global variables used throughout the code. g_cliMode: bool = False +g_logToFile: bool = False +g_logVerbose: bool = False +g_terminalColors: bool = False g_outputDir: str = '' g_logPath: str = '' +g_pathSep: str = '' g_logLevelIntVar: Optional[tk.IntVar] = None g_logToFileBoolVar: Optional[tk.BooleanVar] = None @@ -316,6 +329,7 @@ g_isWindows: bool = False g_isWindowsVista: bool = False g_isWindows7: bool = False +g_isWindows10: bool = False g_tkRoot: Optional[tk.Tk] = None g_tkCanvas: Optional[tk.Canvas] = None @@ -336,6 +350,7 @@ g_usbEpIn: Any = None g_usbEpOut: Any = None g_usbEpMaxPacketSize: int = 0 +g_usbVer: str = "" g_nxdtVersionMajor: int = 0 g_nxdtVersionMinor: int = 0 @@ -351,6 +366,15 @@ g_nspFile: Optional[BufferedWriter] = None g_nspFilePath: str = '' +g_extractedFsDumpMode: bool = False + +g_formattedFileSize: float = 0 +g_fileSizeMiB: float = 0 +g_formattedFileUnit: str = 'B' +g_startTime: float = 0 + + + # Reference: https://beenje.github.io/blog/posts/logging-to-a-tkinter-scrolledtext-widget. class LogQueueHandler(logging.Handler): def __init__(self, log_queue: queue.Queue): @@ -359,15 +383,53 @@ def __init__(self, log_queue: queue.Queue): def emit(self, record: logging.LogRecord) -> None: if g_cliMode: - msg = self.format(record) - print(msg) + msg = self.format_message(record) + self.log_to_stdout(record, msg) + if g_logToFile: + self.log_to_file(msg) else: self.log_queue.put(record) - if g_logToFileBoolVar.get(): - logToFileMsg=datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]+'\t['+ \ - record.levelname+']\t'+self.format(record).strip()+'\n' - with open (g_logPath, 'a', encoding="utf-8") as f: - f.write(logToFileMsg) + + def format_message(self, record:logging.LogRecord) -> str: + msg = "" + prepend = datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]+'\t['+record.levelname+']\t' + content = self.format(record) + if content[0] == '\n': + msg = prepend + '\n' + prepend + content = content[1:] + else: + msg = prepend + msg = msg + content + if content[-1:] == '\n': + msg = msg + prepend + '\n' + else: + msg = msg + '\n' + return msg + + def log_to_stdout(self, record:logging.LogRecord, msg: str) -> None: + if g_terminalColors: + match record.levelname: + case "DEBUG": + print(COLOR_DEBUG + msg, end="") + case "INFO": + print(COLOR_INFO + msg, end="") + case "WARNING": + print(COLOR_WARNING + msg, end="") + case "ERROR": + print(COLOR_ERROR + msg, end="") + case "CRITICAL": + print(COLOR_CRITICAL + msg, end="") + case _: + print(COLOR_DEBUG + msg, end="") + if g_isWindows10: + print(COLOR_BACKGROUND, end="") + else: + print(msg, end="") + + def log_to_file (self, msg: str) -> None: + with open (g_logPath, 'a', encoding="utf-8") as f: + f.write(msg) + # Reference: https://beenje.github.io/blog/posts/logging-to-a-tkinter-scrolledtext-widget. class LogConsole: @@ -408,6 +470,8 @@ def poll_log_queue(self) -> None: break else: self.display(record) + if g_logToFile: + self.queue_handler.log_to_file(self.queue_handler.format_message(record)) if self.frame: self.frame.after(100, self.poll_log_queue) @@ -466,6 +530,7 @@ def __del__(self): self.tk_parent.after(0, self.tk_window.destroy) def start(self, total: int, n: int = 0, divider: int = 1, prefix: str = '', unit: str = 'B') -> None: + if (total <= 0) or (n < 0) or (divider < 1): raise Exception('Invalid arguments!') @@ -477,6 +542,7 @@ def start(self, total: int, n: int = 0, divider: int = 1, prefix: str = '', unit self.unit = unit if self.tk_pbar: + #print() self.tk_pbar.configure(maximum=self.total_div, mode='determinate') self.start_time = time.time() else: @@ -548,7 +614,7 @@ def end(self) -> None: #assert self.pbar is not None self.pbar.close() self.pbar = None - print() + #print() def set_prefix(self, prefix) -> None: self.prefix = prefix @@ -585,36 +651,73 @@ def utilsGetPath(path_arg: str, fallback_path: str, is_file: bool, create: bool def utilsGetWinFullPath(path_arg: str) -> str: return '\\\\?\\' + path_arg.replace("/", "\\") - -def utilsResetOutputDir() -> None: - global g_outputDir - global g_logPath - #assert g_tkDirText is not None - g_outputDir = g_tkDirText.get('1.0', tk.END).strip() - if not g_outputDir: - # We should never reach this, honestly. - messagebox.showerror('Error', 'You must provide an output directory!', parent=g_tkRoot) - return +def utilsUpdateLogPath() -> None: + global g_logPath - if g_logToFileBoolVar.get(): - g_logPath = os.path.abspath(g_outputDir + os.path.sep + \ + g_logPath = os.path.abspath(g_outputDir + os.path.sep + \ "nxdt_host_" + datetime.now().strftime('%Y-%m-%d_%H%M%S') + '.log') - if g_isWindows: + if g_isWindows: g_logPath = utilsGetWinFullPath(g_logPath) - print(g_logPath) - else: - print("Not logging to file.") - # Make sure the full directory tree exists. - try: - os.makedirs(g_outputDir, exist_ok=True) - except: - utilsLogException(traceback.format_exc()) - messagebox.showerror('Error', 'Unable to create full output directory tree!', parent=g_tkRoot) - return + return + +# Enable terminal colors on *nix and supported Windows (10.0.10586+) + +def utilsSetupTerminal() -> None: + global g_terminalColors + + # ref0: https://stackoverflow.com/a/36760881 + # ref1: https://learn.microsoft.com/en-us/windows/console/setconsolemode + + if g_isWindows10: + try: + import ctypes + ctypes.windll.kernel32.GetStdHandle((-11), 7) + g_terminalColors = True + except: + utilsLogException(tracebackSer.format_exc()) + g_terminalColors = False + else: + if not g_isWindows: + g_terminalColors = True + else: + g_terminalColors = False + + # If colors supported, unconditionally set background color to black + if g_terminalColors: + print(COLOR_BACKGROUND) + +# Log basic info about the script and settings. +def utilsLogBasicInfo() -> None: + global g_logToFile, g_logVerbose, g_pathSep + g_logger.info('\n' + SCRIPT_TITLE + '. ' + COPYRIGHT_TEXT + '.') + g_logger.info('\nServer started...\n') + g_logger.info('Sys:\tPython ' + platform.python_version() + " on "+ g_osType+" "+g_osVersion) + g_logger.info('Dst:\t' + g_outputDir) + + if g_logToFile: + g_logger.info('Log:\t' + g_logPath.rsplit(g_pathSep, 1)[-1]) + else: + g_logger.info('Logging to file is disabled.') + if g_logVerbose: + g_logger.info('Verbose logging is enabled.\n') + else: + g_logger.info('Verbose logging is disabled.\n') + return + +# On successful transfer, log elapsed time and (within reason) average transfer speed +def utilsLogTransferStats(elapsed_time: float) -> None: + if g_formattedFileUnit == "GiB" or g_formattedFileUnit == "MiB": + formatted_time = f'{elapsed_time:.2f}s' if round(elapsed_time < 60) else tqdm.format_interval(elapsed_time) + g_logger.info(f'{g_formattedFileSize:.2f}{g_formattedFileUnit} transferred in {formatted_time}.') + if elapsed_time > float(1): + g_logger.info(f'Avg speed: {g_fileSizeMiB/elapsed_time:.2f}MiB/s\n') + else: + g_logger.info(" ") + def utilsIsValueAlignedToEndpointPacketSize(value: int) -> bool: return bool((value & (g_usbEpMaxPacketSize - 1)) == 0) @@ -648,8 +751,16 @@ def utilsGetSizeUnitAndDivisor(size: int) -> Tuple[str, int]: return ret +def utilsInitTransferVars(file_size: int) -> None: + global g_formattedFileSize, g_formattedFileUnit, g_fileSizeMiB, g_startTime + (g_formattedFileUnit,divisor) = utilsGetSizeUnitAndDivisor(file_size) + g_formattedFileSize = file_size / divisor + g_fileSizeMiB = file_size / 1048576 + g_startTime = time.time() + + def usbGetDeviceEndpoints() -> bool: - global g_usbEpIn, g_usbEpOut, g_usbEpMaxPacketSize + global g_usbEpIn, g_usbEpOut, g_usbEpMaxPacketSize, g_usbVer #assert g_logger is not None #assert g_stopEvent is not None @@ -660,7 +771,7 @@ def usbGetDeviceEndpoints() -> bool: usb_version = 0 if g_cliMode: - g_logger.info(f'Please connect a Nintendo Switch console running {USB_DEV_PRODUCT}.') + g_logger.info(f'Please connect a Nintendo Switch console running {USB_DEV_PRODUCT}.\n') while True: # Check if the user decided to stop the server. @@ -710,13 +821,10 @@ def usbGetDeviceEndpoints() -> bool: usb_version = cur_dev.bcdUSB break - + g_usbVer = f'USB {usb_version >> 8}.{(usb_version & 0xFF) >> 4}' g_logger.debug(f'Successfully retrieved USB endpoints! (bus {cur_dev.bus}, address {cur_dev.address}).') - g_logger.debug(f'Max packet size: 0x{g_usbEpMaxPacketSize:X} (USB {usb_version >> 8}.{(usb_version & 0xFF) >> 4}).\n') - - if g_cliMode: - g_logger.info(f'Exit {USB_DEV_PRODUCT} or disconnect your console at any time to close this script.') - + g_logger.debug(f'Max packet size: 0x{g_usbEpMaxPacketSize:X} ({g_usbVer}).') + return True def usbRead(size: int, timeout: int = -1) -> bytes: @@ -730,7 +838,7 @@ def usbRead(size: int, timeout: int = -1) -> bytes: except usb.core.USBError: if not g_cliMode: utilsLogException(traceback.format_exc()) - g_logger.error('\nUSB timeout triggered or console disconnected.') + g_logger.error('\nUSB timeout triggered or console disconnected.\n') return rd @@ -744,7 +852,7 @@ def usbWrite(data: bytes, timeout: int = -1) -> int: except usb.core.USBError: if not g_cliMode: utilsLogException(traceback.format_exc()) - g_logger.error('\nUSB timeout triggered or console disconnected.') + g_logger.error('\nUSB timeout triggered or console disconnected.\n') return wr @@ -757,10 +865,7 @@ def usbHandleStartSession(cmd_block: bytes) -> int: #assert g_logger is not None - if g_cliMode: - print() - - g_logger.debug(f'Received StartSession ({USB_CMD_START_SESSION:02X}) command.') + g_logger.debug(f'\nReceived StartSession ({USB_CMD_START_SESSION:02X}) command.') # Parse command block. (g_nxdtVersionMajor, g_nxdtVersionMinor, g_nxdtVersionMicro, abi_version, git_commit) = struct.unpack_from(' int: g_nxdtAbiVersionMinor = (abi_version & 0x0F) # Print client info. - g_logger.info(f'Client info: {USB_DEV_PRODUCT} v{g_nxdtVersionMajor}.{g_nxdtVersionMinor}.{g_nxdtVersionMicro}, USB ABI v{g_nxdtAbiVersionMajor}.{g_nxdtAbiVersionMinor} (commit {g_nxdtGitCommit}).\n') + g_logger.info(f'Client: {USB_DEV_PRODUCT} v{g_nxdtVersionMajor}.{g_nxdtVersionMinor}.{g_nxdtVersionMicro}, USB ABI v{g_nxdtAbiVersionMajor}.{g_nxdtAbiVersionMinor} (commit {g_nxdtGitCommit}).') + if not g_logVerbose: + g_logger.info(f'Connection: {g_usbVer}.\n') + + if g_cliMode: + g_logger.info(f'Exit {USB_DEV_PRODUCT} or disconnect your console at any time to close this script.\n') + # Check if we support this ABI version. if (g_nxdtAbiVersionMajor != USB_ABI_VERSION_MAJOR) or (g_nxdtAbiVersionMinor != USB_ABI_VERSION_MINOR): - g_logger.error('Unsupported ABI version!') + g_logger.error('\nUnsupported ABI version!\n') return USB_STATUS_UNSUPPORTED_ABI_VERSION # Return status code. @@ -783,41 +894,61 @@ def usbHandleStartSession(cmd_block: bytes) -> int: def usbHandleSendFileProperties(cmd_block: bytes) -> int | None: global g_nspTransferMode, g_nspSize, g_nspHeaderSize, g_nspRemainingSize, g_nspFile, g_nspFilePath, g_outputDir, g_tkRoot, g_progressBarWindow + global g_formattedFileSize, g_formattedFileUnit, g_fileSizeMiB, g_startTime #assert g_logger is not None #assert g_progressBarWindow is not None - if g_cliMode and not g_nspTransferMode: - print() - - g_logger.debug(f'Received SendFileProperties ({USB_CMD_SEND_FILE_PROPERTIES:02X}) command.') + g_logger.debug(f'\nReceived SendFileProperties ({USB_CMD_SEND_FILE_PROPERTIES:02X}) command.') # Parse command block. (file_size, filename_length, nsp_header_size, raw_filename) = struct.unpack_from(f' 0: dbg_str += f' | NSP header size: 0x{nsp_header_size:X}' g_logger.debug(dbg_str + '.') - file_type_str = ('file' if (not g_nspTransferMode) else 'NSP file entry') - - if not g_cliMode or (g_cliMode and not g_nspTransferMode): - g_logger.info(f'Receiving {file_type_str}: "{filename}".') - + # Log basic file info + if not g_nspTransferMode and not g_extractedFsDumpMode: + ext = filename[-3:] + match ext: + case "xci": g_logger.info("\tXCI transfer started!") + case "bin": g_logger.info("\tGamecard extra data transfer started!") + case "nsp": g_logger.info("\tNSP transfer started!") + case "nca": g_logger.info("\tRaw NCA transfer started!") + case "tik": g_logger.info("\tTicket transfer started!") + case _: g_logger.info("\tTransfer of unknown data type started!") # uh-oh? + utilsInitTransferVars(file_size) + g_logger.info(f'\nFile:\t{filename}') + g_logger.info(f'Size:\t{g_formattedFileSize:.2f}{g_formattedFileUnit}') + if(ext == "nsp"): + g_logger.info(f'Contents:') + else: + (unit,div) = utilsGetSizeUnitAndDivisor(file_size) + fs = file_size / div + if g_extractedFsDumpMode: + path_array = filename.split("/") + fn = '/'.join(path_array[7:]) + elif g_nspTransferMode: + fn = filename + g_logger.info(f'\t{fn} ({fs:.2f} {unit})') + # Perform validity checks. if (not g_nspTransferMode) and file_size and (nsp_header_size >= file_size): - g_logger.error('NSP header size must be smaller than the full NSP size!\n') + g_logger.error('\nNSP header size must be smaller than the full NSP size!\n') return USB_STATUS_MALFORMED_CMD if g_nspTransferMode and nsp_header_size: - g_logger.error('Received non-zero NSP header size during NSP transfer mode!\n') + g_logger.error('\nReceived non-zero NSP header size during NSP transfer mode!\n') return USB_STATUS_MALFORMED_CMD if (not filename_length) or (filename_length > USB_FILE_PROPERTIES_MAX_NAME_LENGTH): - g_logger.error('Invalid filename length!\n') + g_logger.error('\nInvalid filename length!\n') return USB_STATUS_MALFORMED_CMD # Enable NSP transfer mode (if needed). @@ -828,7 +959,7 @@ def usbHandleSendFileProperties(cmd_block: bytes) -> int | None: g_nspRemainingSize = (file_size - nsp_header_size) g_nspFile = None g_nspFilePath = '' - g_logger.debug('NSP transfer mode enabled!\n') + g_logger.debug('\nNSP transfer mode enabled!') # Perform additional validity checks and get a file object to work with. if (not g_nspTransferMode) or (g_nspFile is None): @@ -848,14 +979,14 @@ def usbHandleSendFileProperties(cmd_block: bytes) -> int | None: # Make sure the output filepath doesn't point to an existing directory. if os.path.exists(fullpath) and (not os.path.isfile(fullpath)): utilsResetNspInfo() - g_logger.error(f'Output filepath points to an existing directory! ("{fullpath}").\n') + g_logger.error(f'\nOutput filepath points to an existing directory! ("{fullpath}").\n') return USB_STATUS_HOST_IO_ERROR # Make sure we have enough free space. (total_space, used_space, free_space) = shutil.disk_usage(dirpath) if free_space <= file_size: utilsResetNspInfo() - g_logger.error('Not enough free space available in output volume!\n') + g_logger.error('\nNot enough free space available in output volume!\n') return USB_STATUS_HOST_IO_ERROR # Get file object. @@ -882,13 +1013,12 @@ def usbHandleSendFileProperties(cmd_block: bytes) -> int | None: # Close file (if needed). if not g_nspTransferMode: file.close() - # Let the command handler take care of sending the status response for us. return USB_STATUS_SUCCESS # Send status response before entering the data transfer stage. usbSendStatus(USB_STATUS_SUCCESS) - + # Start data transfer stage. g_logger.debug(f'Data transfer started. Saving {file_type_str} to: "{fullpath}".') @@ -935,6 +1065,9 @@ def cancelTransfer(): # Start transfer process. start_time = time.time() + # if not g_nspTransferMode: + # g_startTime = start_time + # print(f'1: {g_startTime}') while offset < file_size: # Update block size (if needed). @@ -949,7 +1082,7 @@ def cancelTransfer(): # Read current chunk. chunk = usbRead(rd_size, USB_TRANSFER_TIMEOUT) if not chunk: - g_logger.error(f'Failed to read 0x{rd_size:X}-byte long data chunk!') + g_logger.error(f'\nFailed to read 0x{rd_size:X}-byte long data chunk!\n') # Cancel file transfer. cancelTransfer() @@ -966,7 +1099,7 @@ def cancelTransfer(): # Cancel file transfer. cancelTransfer() - g_logger.debug(f'Received CancelFileTransfer ({USB_CMD_CANCEL_FILE_TRANSFER:02X}) command.') + g_logger.debug(f'\nReceived CancelFileTransfer ({USB_CMD_CANCEL_FILE_TRANSFER:02X}) command.') g_logger.warning('Transfer cancelled.') # Let the command handler take care of sending the status response for us. @@ -987,30 +1120,34 @@ def cancelTransfer(): if use_pbar: g_progressBarWindow.update(chunk_size) - elapsed_time = round(time.time() - start_time) - g_logger.debug(f'File transfer successfully completed in {tqdm.format_interval(elapsed_time)}!\n') - - # Close file handle (if needed). - if not g_nspTransferMode: - file.close() + elapsed_time = time.time() - start_time + g_logger.debug(f'\nFile transfer successfully completed in {elapsed_time:.2f}s!') # Hide progress bar window (if needed). if use_pbar and ((not g_nspTransferMode) or (not g_nspRemainingSize)): g_progressBarWindow.end() + # Close file handle (if needed); log successful non-constitutent transfer. + if not g_nspTransferMode: + file.close() + if not g_extractedFsDumpMode: + if not g_logVerbose: + g_logger.info('\n\tTransfer complete!\n') + utilsLogTransferStats(elapsed_time); + return USB_STATUS_SUCCESS def usbHandleCancelFileTransfer(cmd_block: bytes) -> int: #assert g_logger is not None - g_logger.debug(f'Received CancelFileTransfer ({USB_CMD_START_SESSION:02X}) command.') + g_logger.debug(f'\nReceived CancelFileTransfer ({USB_CMD_START_SESSION:02X}) command.') if g_nspTransferMode: utilsResetNspInfo(True) g_logger.warning('Transfer cancelled.') return USB_STATUS_SUCCESS else: - g_logger.error('Unexpected transfer cancellation.') + g_logger.error('\nUnexpected transfer cancellation.\n') return USB_STATUS_MALFORMED_CMD def usbHandleSendNspHeader(cmd_block: bytes) -> int: @@ -1021,27 +1158,33 @@ def usbHandleSendNspHeader(cmd_block: bytes) -> int: nsp_header_size = len(cmd_block) - g_logger.debug(f'Received SendNspHeader ({USB_CMD_SEND_NSP_HEADER:02X}) command.') + g_logger.debug(f'\nReceived SendNspHeader ({USB_CMD_SEND_NSP_HEADER:02X}) command.') # Validity checks. if not g_nspTransferMode: - g_logger.error('Received NSP header out of NSP transfer mode!\n') + g_logger.error('\nReceived NSP header out of NSP transfer mode!\n') return USB_STATUS_MALFORMED_CMD if g_nspRemainingSize: - g_logger.error(f'Received NSP header before receiving all NSP data! (missing 0x{g_nspRemainingSize:X} byte[s]).\n') + g_logger.error(f'\nReceived NSP header before receiving all NSP data! (missing 0x{g_nspRemainingSize:X} byte[s]).\n') return USB_STATUS_MALFORMED_CMD if nsp_header_size != g_nspHeaderSize: - g_logger.error(f'NSP header size mismatch! (0x{nsp_header_size:X} != 0x{g_nspHeaderSize:X}).\n') + g_logger.error(f'\nNSP header size mismatch! (0x{nsp_header_size:X} != 0x{g_nspHeaderSize:X}).\n') return USB_STATUS_MALFORMED_CMD # Write NSP header. g_nspFile.seek(0) g_nspFile.write(cmd_block) + + # Log successful NSP transfer (header distinguishes it from constituent NCA transfers) - g_logger.debug(f'Successfully wrote 0x{nsp_header_size:X}-byte long NSP header to "{g_nspFilePath}".\n') + g_logger.debug(f'Successfully wrote 0x{nsp_header_size:X}-byte long NSP header to "{g_nspFilePath}".') + if not g_logVerbose: + g_logger.info('\n\tTransfer complete!\n') + utilsLogTransferStats(time.time() - g_startTime) + # Disable NSP transfer mode. utilsResetNspInfo() @@ -1049,31 +1192,49 @@ def usbHandleSendNspHeader(cmd_block: bytes) -> int: def usbHandleEndSession(cmd_block: bytes) -> int: #assert g_logger is not None - g_logger.debug(f'Received EndSession ({USB_CMD_END_SESSION:02X}) command.') + g_logger.debug(f'\nReceived EndSession ({USB_CMD_END_SESSION:02X}) command.') return USB_STATUS_SUCCESS def usbHandleStartExtractedFsDump(cmd_block: bytes) -> int: #assert g_logger is not None - - g_logger.debug(f'Received StartExtractedFsDump ({USB_CMD_START_EXTRACTED_FS_DUMP:02X}) command.') + global g_extractedFsDumpMode, g_formattedFileSize, g_formattedFileUnit, g_fileSizeMiB, g_startTime + + g_logger.debug(f'\nReceived StartExtractedFsDump ({USB_CMD_START_EXTRACTED_FS_DUMP:02X}) command.') if g_nspTransferMode: - g_logger.error('StartExtractedFsDump received mid NSP transfer.') + g_logger.error('\nStartExtractedFsDump received mid NSP transfer.\n') return USB_STATUS_MALFORMED_CMD # Parse command block. (extracted_fs_size, extracted_fs_root_path) = struct.unpack_from(f' int: + global g_extractedFsDumpMode #assert g_logger is not None - g_logger.debug(f'Received EndExtractedFsDump ({USB_CMD_END_EXTRACTED_FS_DUMP:02X}) command.') - g_logger.info(f'Finished extracted FS dump.') + g_extractedFsDumpMode = False + g_logger.debug(f'\nReceived EndExtractedFsDump ({USB_CMD_END_EXTRACTED_FS_DUMP:02X}) command.') + if not g_logVerbose: + g_logger.info(f'\n\tExtracted FS dump complete!\n') + utilsLogTransferStats(time.time() - g_startTime) return USB_STATUS_SUCCESS def usbCommandHandler() -> None: @@ -1110,7 +1271,7 @@ def usbCommandHandler() -> None: # Read command header. cmd_header = usbRead(USB_CMD_HEADER_SIZE) if (not cmd_header) or (len(cmd_header) != USB_CMD_HEADER_SIZE): - g_logger.error(f'Failed to read 0x{USB_CMD_HEADER_SIZE:X}-byte long command header!') + g_logger.error(f'\nFailed to read 0x{USB_CMD_HEADER_SIZE:X}-byte long command header!\n') break # Parse command header. @@ -1128,19 +1289,19 @@ def usbCommandHandler() -> None: cmd_block = usbRead(rd_size, USB_TRANSFER_TIMEOUT) if (not cmd_block) or (len(cmd_block) != cmd_block_size): - g_logger.error(f'Failed to read 0x{cmd_block_size:X}-byte long command block for command ID {cmd_id:02X}!') + g_logger.error(f'\nFailed to read 0x{cmd_block_size:X}-byte long command block for command ID {cmd_id:02X}!\n') break # Verify magic word. if magic != USB_MAGIC_WORD: - g_logger.error('Received command header with invalid magic word!\n') + g_logger.error('\nReceived command header with invalid magic word!\n') usbSendStatus(USB_STATUS_INVALID_MAGIC_WORD) continue # Get command handler function. cmd_func = cmd_dict.get(cmd_id, None) if cmd_func is None: - g_logger.error(f'Received command header with unsupported ID {cmd_id:02X}.\n') + g_logger.error(f'\nReceived command header with unsupported ID {cmd_id:02X}.\n') usbSendStatus(USB_STATUS_UNSUPPORTED_CMD) continue @@ -1149,7 +1310,7 @@ def usbCommandHandler() -> None: (cmd_id == USB_CMD_SEND_FILE_PROPERTIES and cmd_block_size != USB_CMD_BLOCK_SIZE_SEND_FILE_PROPERTIES) or \ (cmd_id == USB_CMD_SEND_NSP_HEADER and not cmd_block_size) or \ (cmd_id == USB_CMD_START_EXTRACTED_FS_DUMP and cmd_block_size != USB_CMD_BLOCK_SIZE_START_EXTRACTED_FS_DUMP): - g_logger.error(f'Invalid command block size for command ID {cmd_id:02X}! (0x{cmd_block_size:X}).\n') + g_logger.error(f'\nInvalid command block size for command ID {cmd_id:02X}! (0x{cmd_block_size:X}).\n') usbSendStatus(USB_STATUS_MALFORMED_CMD) continue @@ -1159,7 +1320,7 @@ def usbCommandHandler() -> None: if (status is None) or (not usbSendStatus(status)) or (cmd_id == USB_CMD_END_SESSION) or (status == USB_STATUS_UNSUPPORTED_ABI_VERSION): break - g_logger.info('\nStopping server.') + g_logger.info('Stopping server...\n') if not g_cliMode: # Update UI. @@ -1170,25 +1331,21 @@ def uiStopServer() -> None: #assert g_stopEvent is not None g_stopEvent.set() # Log the end of the session. - g_logger.info("\nServer stopped.\n") + g_logger.info("Server stopped.\n") def uiStartServer() -> None: - # Reset output path variable based on possible user input unconditionally. - utilsResetOutputDir() - + # Set new log path for this session if logging to file is turned on. + if g_logToFile: + utilsUpdateLogPath() + # Update UI. uiToggleElements(False) - # Log basic session status and info. - g_logger.info('\nServer started.') - g_logger.info('Output Dir:\t' + g_outputDir) - if g_logToFileBoolVar.get(): - g_logger.info('Logging to:\t' + g_logPath.rsplit('\\', 1)[-1] + '\n') - else: - g_logger.info('Logging to file disabled.\n') - + # Log basic info about the script and settings. + utilsLogBasicInfo() + # Create background server thread. server_thread = threading.Thread(target=usbCommandHandler, daemon=True) server_thread.start() @@ -1225,9 +1382,16 @@ def uiToggleElements(flag: bool) -> None: g_tkVerboseCheckbox.configure(state='disabled') def uiChooseDirectory() -> None: - dir = filedialog.askdirectory(parent=g_tkRoot, title='Select an output directory', initialdir=INITIAL_DIR, mustexist=True) + #assert g_tkDirText is not None + dirtext = g_tkDirText.get('1.0', tk.END).strip() + initdir = dirtext if os.path.exists(dirtext) else INITIAL_DIR + + dir = filedialog.askdirectory(parent=g_tkRoot, title='Select an output directory', initialdir=initdir, mustexist=True) + if dir: uiUpdateDirectoryField(os.path.abspath(dir)) + uiUpdateOutputDir() + def uiUpdateDirectoryField(path: str) -> None: #assert g_tkDirText is not None @@ -1236,6 +1400,27 @@ def uiUpdateDirectoryField(path: str) -> None: g_tkDirText.insert('1.0', path) g_tkDirText.configure(state='disabled') + +def uiUpdateOutputDir() -> None: + global g_outputDir + #assert g_tkDirText is not None + + g_outputDir = g_tkDirText.get('1.0', tk.END).strip() + if not g_outputDir: + # We should never reach this, honestly. + messagebox.showerror('Error', 'You must provide an output directory!', parent=g_tkRoot) + return + + # Make sure the full directory tree exists. + try: + os.makedirs(g_outputDir, exist_ok=True) + except: + utilsLogException(traceback.format_exc()) + messagebox.showerror('Error', 'Unable to create full output directory tree!', parent=g_tkRoot) + return + + return + def uiHandleExitProtocol() -> None: #assert g_tkRoot is not None g_tkRoot.destroy() @@ -1246,13 +1431,24 @@ def uiHandleExitProtocolStub() -> None: def uiScaleMeasure(measure: int) -> int: return round(float(measure) * SCALE) +def uiHandleLogToFileCheckbox() -> None: + #assert g_logToFile is not None + #assert g_logToFileBoolVar is not None + global g_logToFile + g_logToFile = g_logToFileBoolVar.get() + return + def uiHandleVerboseCheckbox() -> None: #assert g_logger is not None #assert g_logLevelIntVar is not None - g_logger.setLevel(g_logLevelIntVar.get()) + global g_logVerbose + logLevel=g_logLevelIntVar.get() + g_logger.setLevel(logLevel) + g_logVerbose = True if(logLevel == logging.DEBUG) else False + return def uiInitialize() -> None: - global SCALE, g_logLevelIntVar, g_logToFileBoolVar + global SCALE, g_logLevelIntVar, g_logToFileBoolVar, g_logToFile, g_logVerbose global g_tkRoot, g_tkCanvas, g_tkDirText, g_tkChooseDirButton, g_tkServerButton, g_tkTipMessage, g_tkScrolledTextLog, g_tkLogToFileCheckbox, g_tkVerboseCheckbox global g_stopEvent, g_tlb, g_taskbar, g_progressBarWindow @@ -1325,6 +1521,7 @@ def uiInitialize() -> None: default_font = font.nametofont('TkDefaultFont') default_font_family = ('Segoe UI' if g_isWindows else 'sans-serif') default_font_size = (-12 if g_isWindows else -10) # Measured in pixels. Reference: https://docs.python.org/3/library/tkinter.font.html + padding_bottom = WINDOW_HEIGHT + default_font_size - 1 default_font.configure(family=default_font_family, size=uiScaleMeasure(default_font_size), weight=font.NORMAL) """print(screen_width_px, screen_height_px) @@ -1341,10 +1538,11 @@ def uiInitialize() -> None: g_tkCanvas = tk.Canvas(g_tkRoot, width=window_width_px, height=window_height_px) g_tkCanvas.pack() - g_tkCanvas.create_text(uiScaleMeasure(60), uiScaleMeasure(30), text='Output Dir:', anchor=tk.CENTER) + g_tkCanvas.create_text(uiScaleMeasure(60), uiScaleMeasure(30), text='Output directory:', anchor=tk.CENTER) g_tkDirText = tk.Text(g_tkRoot, height=1, width=45, font=default_font, wrap='none', state='disabled', bg='#F0F0F0') uiUpdateDirectoryField(g_outputDir) + g_tkCanvas.create_window(uiScaleMeasure(260), uiScaleMeasure(30), window=g_tkDirText, anchor=tk.CENTER) g_tkChooseDirButton = tk.Button(g_tkRoot, text='Choose', width=10, command=uiChooseDirectory) @@ -1364,18 +1562,19 @@ def uiInitialize() -> None: g_tkScrolledTextLog.tag_config('CRITICAL', foreground='red', underline=True) g_tkCanvas.create_window(uiScaleMeasure(int(WINDOW_WIDTH / 2)), uiScaleMeasure(280), window=g_tkScrolledTextLog, anchor=tk.CENTER) - g_tkCanvas.create_text(uiScaleMeasure(5), uiScaleMeasure(WINDOW_HEIGHT - 13), text=COPYRIGHT_TEXT, anchor=tk.W) + g_tkCanvas.create_text(uiScaleMeasure(5), uiScaleMeasure(padding_bottom), text=COPYRIGHT_TEXT, anchor=tk.W) g_logToFileBoolVar = tk.BooleanVar() - g_tkLogToFileCheckbox = tk.Checkbutton(g_tkRoot, text='Log to file', variable=g_logToFileBoolVar,onvalue=True, offvalue=False) + g_tkLogToFileCheckbox = tk.Checkbutton(g_tkRoot, text='Log to file', variable=g_logToFileBoolVar,onvalue=True, offvalue=False, command=uiHandleLogToFileCheckbox) g_tkLogToFileCheckbox.select() + g_logToFile = g_logToFileBoolVar.get() - g_tkCanvas.create_window(uiScaleMeasure(WINDOW_WIDTH - 165), uiScaleMeasure(WINDOW_HEIGHT - 13), window=g_tkLogToFileCheckbox, anchor=tk.CENTER) + g_tkCanvas.create_window(uiScaleMeasure(WINDOW_WIDTH - 165), uiScaleMeasure(padding_bottom), window=g_tkLogToFileCheckbox, anchor=tk.CENTER) g_logLevelIntVar = tk.IntVar() g_tkVerboseCheckbox = tk.Checkbutton(g_tkRoot, text='Verbose output', variable=g_logLevelIntVar, onvalue=logging.DEBUG, offvalue=logging.INFO, command=uiHandleVerboseCheckbox) - g_tkCanvas.create_window(uiScaleMeasure(WINDOW_WIDTH - 55), uiScaleMeasure(WINDOW_HEIGHT - 13), window=g_tkVerboseCheckbox, anchor=tk.CENTER) + g_tkCanvas.create_window(uiScaleMeasure(WINDOW_WIDTH - 55), uiScaleMeasure(padding_bottom), window=g_tkVerboseCheckbox, anchor=tk.CENTER) # Initialize console logger. console = LogConsole(g_tkScrolledTextLog) @@ -1392,37 +1591,47 @@ def cliInitialize() -> None: global g_progressBarWindow #assert g_logger is not None - + + # determines whether to use colors in terminal and sets up accordingly + utilsSetupTerminal() + + # Set log path if logging to file specified at cmd line + if g_logToFile: + utilsUpdateLogPath() + # Initialize console logger. console = LogConsole() # Initialize progress bar window object. bar_format = '{percentage:.2f}% |{bar}| {n:.2f}/{total:.2f} [{elapsed}<{remaining}, {rate_fmt}]' g_progressBarWindow = ProgressBarWindow(bar_format) - - # Print info. - g_logger.info('\n' + SCRIPT_TITLE + '. ' + COPYRIGHT_TEXT + '.') - g_logger.info('Output Dir: "' + g_outputDir + '".\n') + + # Log basic info about the script and settings. + utilsLogBasicInfo() # Start USB command handler directly. usbCommandHandler() def main() -> int: - global g_cliMode, g_outputDir, g_osType, g_osVersion, g_isWindows, g_isWindowsVista, g_isWindows7, g_logger - + global g_cliMode, g_outputDir, g_logToFile, g_logVerbose, g_osType, g_osVersion, g_isWindows, g_isWindowsVista, g_isWindows7, g_isWindows10, g_pathSep, g_logger + # Disable warnings. warnings.filterwarnings("ignore") # Parse command line arguments. - parser = ArgumentParser(description=SCRIPT_TITLE + '. ' + COPYRIGHT_TEXT + '.') + parser = ArgumentParser(formatter_class=RawTextHelpFormatter, description=SCRIPT_TITLE + '. ' + COPYRIGHT_TEXT + '.') parser.add_argument('-c', '--cli', required=False, action='store_true', default=False, help='Start the script in CLI mode.') - parser.add_argument('-o', '--outdir', required=False, type=str, metavar='DIR', help='Path to output directory. Defaults to "' + DEFAULT_DIR + '".') + parser.add_argument('-o', '--outdir', required=False, type=str, metavar='DIR', help='Path to output directory; will attempt to create if non-existent.'+\ + '\nDefaults to "' + DEFAULT_DIR + '".') + parser.add_argument('-l', '--log', required=False, action='store_true', default=False, help='Enables logging to file in output directory in CLI mode.') parser.add_argument('-v', '--verbose', required=False, action='store_true', default=False, help='Enable verbose output.') args = parser.parse_args() # Update global flags. g_cliMode = args.cli g_outputDir = utilsGetPath(args.outdir, DEFAULT_DIR, False, True) + g_logToFile = args.log + g_logVerbose = args.verbose # Get OS information. g_osType = platform.system() @@ -1430,23 +1639,27 @@ def main() -> int: # Get Windows information. g_isWindows = (g_osType == 'Windows') - g_isWindowsVista = g_isWindows7 = False + g_isWindowsVista = g_isWindows7 = g_isWindows10 = False if g_isWindows: win_ver = g_osVersion.split('.') win_ver_major = int(win_ver[0]) win_ver_minor = int(win_ver[1]) - + win_build = int(win_ver[2]) g_isWindowsVista = (win_ver_major >= 6) g_isWindows7 = (True if (win_ver_major > 6) else (win_ver_major == 6 and win_ver_minor > 0)) + # ANSI colors in cmd.exe min. build + # ref: https://github.com/dart-lang/sdk/issues/28614#issuecomment-287282970 + g_isWindows10 = (win_ver_major >= 10 and win_build >= 10586) + g_pathSep = os.path.sep if not g_isWindows else '\\' # Setup logging mechanism. - logging.basicConfig(level=(logging.DEBUG if args.verbose else logging.INFO)) + logging.basicConfig(level=(logging.DEBUG if g_logVerbose else logging.INFO)) g_logger = logging.getLogger() if len(g_logger.handlers): # Remove stderr output handler from logger. We'll control standard output on our own. log_stderr = g_logger.handlers[0] g_logger.removeHandler(log_stderr) - + if g_cliMode: # Initialize CLI. cliInitialize() @@ -1463,7 +1676,8 @@ def main() -> int: ret = main() except KeyboardInterrupt: time.sleep(0.2) - print("\nHost script interrupted!") + g_logger.info("Host script exited!") + if g_isWindows10: print(COLOR_RESET) except Exception as e: utilsLogException(traceback.format_exc()) From a4c5bf483c65336a365449ab0f453248bff1d8b1 Mon Sep 17 00:00:00 2001 From: bilditup1 Date: Sun, 26 Nov 2023 19:17:20 -0500 Subject: [PATCH 04/10] Proper handling of extracted HFS dumps --- host/nxdt_host.py | 22 +++++++++++++++++----- 1 file changed, 17 insertions(+), 5 deletions(-) diff --git a/host/nxdt_host.py b/host/nxdt_host.py index 96a96406..ba075213 100644 --- a/host/nxdt_host.py +++ b/host/nxdt_host.py @@ -919,13 +919,14 @@ def usbHandleSendFileProperties(cmd_block: bytes) -> int | None: match ext: case "xci": g_logger.info("\tXCI transfer started!") case "bin": g_logger.info("\tGamecard extra data transfer started!") + case "fs0": g_logger.info("\tHFS0 raw partition transfer started!") case "nsp": g_logger.info("\tNSP transfer started!") case "nca": g_logger.info("\tRaw NCA transfer started!") case "tik": g_logger.info("\tTicket transfer started!") case _: g_logger.info("\tTransfer of unknown data type started!") # uh-oh? utilsInitTransferVars(file_size) g_logger.info(f'\nFile:\t{filename}') - g_logger.info(f'Size:\t{g_formattedFileSize:.2f}{g_formattedFileUnit}') + g_logger.info(f'Size:\t{g_formattedFileSize:.2f} {g_formattedFileUnit}') if(ext == "nsp"): g_logger.info(f'Contents:') else: @@ -933,7 +934,10 @@ def usbHandleSendFileProperties(cmd_block: bytes) -> int | None: fs = file_size / div if g_extractedFsDumpMode: path_array = filename.split("/") - fn = '/'.join(path_array[7:]) + match path_array[1]: + case 'NCA FS': fn = '/'.join(path_array[7:]) + case 'HFS': fn = '/'.join(path_array[4:]) + case _: fn = '/'.join(path_array[1:]) elif g_nspTransferMode: fn = filename g_logger.info(f'\t{fn} ({fs:.2f} {unit})') @@ -1213,9 +1217,17 @@ def usbHandleStartExtractedFsDump(cmd_block: bytes) -> int: path_array = extracted_fs_root_path.split('/') if not g_logVerbose: - g_logger.info(f'\tExtracted FS dump started!') - g_logger.info(f'\nSrc:\t{path_array[4]}') - g_logger.info(f'\t{path_array[5]}, FS section #{path_array[6]}') + match path_array[1]: + case 'HFS': + g_logger.info(f'\tExtracted FS dump from HFS (Gamecard) started!') + g_logger.info(f'\nSrc:\t'+'/'.join(path_array[3:])) + case 'NCA FS': + g_logger.info(f'\tExtracted FS dump from NCA FS (NSP) started!') + g_logger.info(f'\nSrc:\t{path_array[4]}') + g_logger.info(f'\t{path_array[5]}, FS section #{path_array[6]}') + case _: + g__logger.info(f'\tExtracted FS dump from novel source (???) started!') + g_logger.info:(f'\nRoot:\t{extracted_fs_root_path}') g_logger.info(f'Size:\t{g_formattedFileSize:.2f} {g_formattedFileUnit}') g_logger.info(f'Files:') else: From 519831ec8fe13ff3af797c83593161fd63761a46 Mon Sep 17 00:00:00 2001 From: bilditup1 Date: Mon, 27 Nov 2023 00:08:14 -0500 Subject: [PATCH 05/10] Fixed incredibly moronic typo --- host/nxdt_host.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/host/nxdt_host.py b/host/nxdt_host.py index ba075213..083ecd28 100644 --- a/host/nxdt_host.py +++ b/host/nxdt_host.py @@ -1226,7 +1226,7 @@ def usbHandleStartExtractedFsDump(cmd_block: bytes) -> int: g_logger.info(f'\nSrc:\t{path_array[4]}') g_logger.info(f'\t{path_array[5]}, FS section #{path_array[6]}') case _: - g__logger.info(f'\tExtracted FS dump from novel source (???) started!') + g_logger.info(f'\tExtracted FS dump from novel source (???) started!') g_logger.info:(f'\nRoot:\t{extracted_fs_root_path}') g_logger.info(f'Size:\t{g_formattedFileSize:.2f} {g_formattedFileUnit}') g_logger.info(f'Files:') From 955821c45be4cff5496759864627e405a0ea5812 Mon Sep 17 00:00:00 2001 From: bilditup1 Date: Sun, 3 Dec 2023 22:26:58 -0500 Subject: [PATCH 06/10] merge conflicts, missed cases, printing improvements --- host/nxdt_host.py | 322 ++++++++++++++++++++++++++++------------------ 1 file changed, 200 insertions(+), 122 deletions(-) diff --git a/host/nxdt_host.py b/host/nxdt_host.py index 083ecd28..ac851a8e 100644 --- a/host/nxdt_host.py +++ b/host/nxdt_host.py @@ -367,11 +367,13 @@ g_nspFilePath: str = '' g_extractedFsDumpMode: bool = False +g_layeredFsDumpMode: bool = False g_formattedFileSize: float = 0 g_fileSizeMiB: float = 0 g_formattedFileUnit: str = 'B' g_startTime: float = 0 +g_extractedFsAbsRoot: str = "" @@ -434,7 +436,7 @@ def log_to_file (self, msg: str) -> None: # Reference: https://beenje.github.io/blog/posts/logging-to-a-tkinter-scrolledtext-widget. class LogConsole: def __init__(self, scrolled_text: Optional[scrolledtext.ScrolledText] = None): - #assert g_logger is not None + assert g_logger is not None self.scrolled_text = scrolled_text self.frame = (self.scrolled_text.winfo_toplevel() if self.scrolled_text else None) @@ -555,8 +557,8 @@ def update(self, n: int) -> None: return if self.tk_window: - #assert self.tk_text_var is not None - #assert self.tk_n_var is not None + assert self.tk_text_var is not None + assert self.tk_n_var is not None cur_n_div = (float(cur_n) / self.divider) self.elapsed_time = (time.time() - self.start_time) @@ -581,7 +583,7 @@ def update(self, n: int) -> None: if g_taskbar: g_taskbar.SetProgressValue(self.hwnd, cur_n, self.total) else: - #assert self.pbar is not None + assert self.pbar is not None n_div = (float(n) / self.divider) self.pbar.update(n_div) @@ -598,7 +600,7 @@ def end(self) -> None: self.elapsed_time = 0 if self.tk_window: - #assert self.tk_pbar is not None + assert self.tk_pbar is not None if g_taskbar: g_taskbar.SetProgressState(self.hwnd, g_tlb.TBPF_NOPROGRESS) @@ -611,10 +613,9 @@ def end(self) -> None: self.tk_pbar.configure(maximum=100, mode='indeterminate') else: - #assert self.pbar is not None + assert self.pbar is not None self.pbar.close() self.pbar = None - #print() def set_prefix(self, prefix) -> None: self.prefix = prefix @@ -647,19 +648,23 @@ def utilsGetPath(path_arg: str, fallback_path: str, is_file: bool, create: bool # ref0: https://learn.microsoft.com/en-us/windows/win32/fileio/naming-a-file?redirectedfrom=MSDN#win32-file-namespaces # ref1: https://learn.microsoft.com/en-us/windows/win32/fileio/maximum-file-path-limitation?tabs=registry # ref2: https://stackoverflow.com/a/15373771 -# Replaces '/' separator with proper '\' in case running under MSYS2 env. +# Also replaces '/' separator with proper '\' in case running under MSYS2 env. def utilsGetWinFullPath(path_arg: str) -> str: - return '\\\\?\\' + path_arg.replace("/", "\\") + global g_isWindows + return ('\\\\?\\' + path_arg.replace("/", "\\")) if g_isWindows else path_arg + +# Strips the preceding prefix when we want to print +def utilsStripWinPrefix(path_arg: str) -> str: + global g_isWindows + return path_arg[4:] if g_isWindows else path_arg + +# Updates the path of the log def utilsUpdateLogPath() -> None: global g_logPath - g_logPath = os.path.abspath(g_outputDir + os.path.sep + \ - "nxdt_host_" + datetime.now().strftime('%Y-%m-%d_%H%M%S') + '.log') - if g_isWindows: - g_logPath = utilsGetWinFullPath(g_logPath) - + "nxdt_host_" + datetime.now().strftime('%Y-%m-%d_%H%M%S') + '.log') return # Enable terminal colors on *nix and supported Windows (10.0.10586+) @@ -689,12 +694,17 @@ def utilsSetupTerminal() -> None: print(COLOR_BACKGROUND) # Log basic info about the script and settings. -def utilsLogBasicInfo() -> None: - global g_logToFile, g_logVerbose, g_pathSep +def utilsLogBasicScriptInfo() -> None: + global g_osType, g_osVersion, g_logToFile, g_logVerbose, g_pathSep, g_isWindows g_logger.info('\n' + SCRIPT_TITLE + '. ' + COPYRIGHT_TEXT + '.') g_logger.info('\nServer started...\n') g_logger.info('Sys:\tPython ' + platform.python_version() + " on "+ g_osType+" "+g_osVersion) - g_logger.info('Dst:\t' + g_outputDir) + + # if g_isWindows: + # g_logger.info('Dst:\t' + utilsStripWinPrefix(g_outputDir)) + # else: + # g_logger.info('Dst:\t' + g_outputDir) + g_logger.info('Dst:\t' + utilsStripWinPrefix(g_outputDir)) if g_logToFile: g_logger.info('Log:\t' + g_logPath.rsplit(g_pathSep, 1)[-1]) @@ -710,11 +720,12 @@ def utilsLogBasicInfo() -> None: # On successful transfer, log elapsed time and (within reason) average transfer speed def utilsLogTransferStats(elapsed_time: float) -> None: - if g_formattedFileUnit == "GiB" or g_formattedFileUnit == "MiB": + global g_formattedFileSize, g_formattedFileUnit, g_fileSizeMiB + if g_formattedFileUnit != "B": formatted_time = f'{elapsed_time:.2f}s' if round(elapsed_time < 60) else tqdm.format_interval(elapsed_time) - g_logger.info(f'{g_formattedFileSize:.2f}{g_formattedFileUnit} transferred in {formatted_time}.') + g_logger.info(f'{g_formattedFileSize:.2f} {g_formattedFileUnit} transferred in {formatted_time}.') if elapsed_time > float(1): - g_logger.info(f'Avg speed: {g_fileSizeMiB/elapsed_time:.2f}MiB/s\n') + g_logger.info(f'Avg speed: {g_fileSizeMiB/elapsed_time:.2f} MiB/s\n') else: g_logger.info(" ") @@ -762,8 +773,8 @@ def utilsInitTransferVars(file_size: int) -> None: def usbGetDeviceEndpoints() -> bool: global g_usbEpIn, g_usbEpOut, g_usbEpMaxPacketSize, g_usbVer - #assert g_logger is not None - #assert g_stopEvent is not None + assert g_logger is not None +# assert g_stopEvent is not None prev_dev = cur_dev = None usb_ep_in_lambda = lambda ep: usb.util.endpoint_direction(ep.bEndpointAddress) == usb.util.ENDPOINT_IN @@ -828,7 +839,6 @@ def usbGetDeviceEndpoints() -> bool: return True def usbRead(size: int, timeout: int = -1) -> bytes: - #assert g_logger is not None rd = b'' @@ -838,12 +848,12 @@ def usbRead(size: int, timeout: int = -1) -> bytes: except usb.core.USBError: if not g_cliMode: utilsLogException(traceback.format_exc()) - g_logger.error('\nUSB timeout triggered or console disconnected.\n') + if g_logger is not None: + g_logger.error('\nUSB timeout triggered or console disconnected.\n') return rd def usbWrite(data: bytes, timeout: int = -1) -> int: - #assert g_logger is not None wr = 0 @@ -852,7 +862,8 @@ def usbWrite(data: bytes, timeout: int = -1) -> int: except usb.core.USBError: if not g_cliMode: utilsLogException(traceback.format_exc()) - g_logger.error('\nUSB timeout triggered or console disconnected.\n') + if g_logger is not None: + g_logger.error('\nUSB timeout triggered or console disconnected.\n') return wr @@ -863,7 +874,7 @@ def usbSendStatus(code: int) -> bool: def usbHandleStartSession(cmd_block: bytes) -> int: global g_nxdtVersionMajor, g_nxdtVersionMinor, g_nxdtVersionMicro, g_nxdtAbiVersionMajor, g_nxdtAbiVersionMinor, g_nxdtGitCommit - #assert g_logger is not None + assert g_logger is not None g_logger.debug(f'\nReceived StartSession ({USB_CMD_START_SESSION:02X}) command.') @@ -896,8 +907,8 @@ def usbHandleSendFileProperties(cmd_block: bytes) -> int | None: global g_nspTransferMode, g_nspSize, g_nspHeaderSize, g_nspRemainingSize, g_nspFile, g_nspFilePath, g_outputDir, g_tkRoot, g_progressBarWindow global g_formattedFileSize, g_formattedFileUnit, g_fileSizeMiB, g_startTime - #assert g_logger is not None - #assert g_progressBarWindow is not None + assert g_logger is not None + assert g_progressBarWindow is not None g_logger.debug(f'\nReceived SendFileProperties ({USB_CMD_SEND_FILE_PROPERTIES:02X}) command.') @@ -907,40 +918,80 @@ def usbHandleSendFileProperties(cmd_block: bytes) -> int | None: file_type_str = ('file' if (not g_nspTransferMode) else 'NSP file entry') - # Print info (debug / verbose). + # Log basic file info (debug / verbose). dbg_str = f'File size: 0x{file_size:X} | Filename length: 0x{filename_length:X}' if nsp_header_size > 0: dbg_str += f' | NSP header size: 0x{nsp_header_size:X}' g_logger.debug(dbg_str + '.') - # Log basic file info - if not g_nspTransferMode and not g_extractedFsDumpMode: - ext = filename[-3:] - match ext: - case "xci": g_logger.info("\tXCI transfer started!") - case "bin": g_logger.info("\tGamecard extra data transfer started!") - case "fs0": g_logger.info("\tHFS0 raw partition transfer started!") - case "nsp": g_logger.info("\tNSP transfer started!") - case "nca": g_logger.info("\tRaw NCA transfer started!") - case "tik": g_logger.info("\tTicket transfer started!") - case _: g_logger.info("\tTransfer of unknown data type started!") # uh-oh? - utilsInitTransferVars(file_size) - g_logger.info(f'\nFile:\t{filename}') - g_logger.info(f'Size:\t{g_formattedFileSize:.2f} {g_formattedFileUnit}') - if(ext == "nsp"): - g_logger.info(f'Contents:') - else: - (unit,div) = utilsGetSizeUnitAndDivisor(file_size) - fs = file_size / div - if g_extractedFsDumpMode: - path_array = filename.split("/") - match path_array[1]: - case 'NCA FS': fn = '/'.join(path_array[7:]) - case 'HFS': fn = '/'.join(path_array[4:]) - case _: fn = '/'.join(path_array[1:]) - elif g_nspTransferMode: - fn = filename - g_logger.info(f'\t{fn} ({fs:.2f} {unit})') + # Log basic file info (verbose off): + if not g_logVerbose: + if not g_nspTransferMode and not g_extractedFsDumpMode: + fp = filename.split('/') # fp[0] is '/' character + fp_len = len(fp) + ext = filename[-3:] + utilsInitTransferVars(file_size) + g_logger.info("\nTransfer started!") + if (fp_len >= 2): # if file has parent directories + match fp[1]: # check parent dir and ext to find type + case 'NSP' | 'Ticket': + g_logger.info(f'\nType:\t{fp[1]}') + g_logger.info(f'Src:\t{fp[2][:fp[2].index("[")]}') + g_logger.info(f'\t{fp[2][fp[2].find("["):fp[2].rfind(".")]}') + g_logger.info(f'Size:\t{g_formattedFileSize:.2f} {g_formattedFileUnit}') + if(ext == 'nsp'): g_logger.info(f'Contents:') + case 'Gamecard': + g_logger.info(f'\nType:\t{fp[1]} [{ext.upper()}]') + g_logger.info(f'Src:\t{fp[2][:fp[2].index("[")]}') + g_logger.info(f'\t{fp[2][fp[2].find("["):fp[2].rfind(".")]}') + g_logger.info(f'Size:\t{g_formattedFileSize:.2f} {g_formattedFileUnit}') + if(ext == 'xci'): g_logger.info(" ") + case 'NCA FS': + g_logger.info(f'\nType:\t{fp[1]} ({fp[2]} {fp[3]}) [{ext.upper()}]') + g_logger.info(f'Src:\t{fp[4][:fp[4].index("[")]}') + g_logger.info(f'\t{fp[4][fp[4].index("["):]}') + g_logger.info(f'\t{fp[5]}, FS section #{fp[6][:fp[6].index(".")]}') + g_logger.info(f'Size:\t{g_formattedFileSize:.2f} {g_formattedFileUnit}\n') + case 'atmosphere': # System/NCA Dump -> Section Mode && Write Raw Section && Use LayeredFS Dir + g_logger.info(f'\nType:\tNCA FS (atmosphere/contents) [{ext.upper()}]') + g_logger.info(f'Src:\t{fp[3]}') + g_logger.info(f'\t{fp[4][:fp[4].index(".")]}') + g_logger.info(f'Size:\t{g_formattedFileSize:.2f} {g_formattedFileUnit}\n') + case 'NCA' | 'HFS': + printable_ext = fp[fp_len-1][fp[fp_len-1].index(".")+1:] if fp[1] == 'HFS' else ext + g_logger.info(f'\nType:\t{fp[1]} ({fp[2]}) [{printable_ext.upper()}]') + g_logger.info(f'Src:\t{fp[3][:fp[3].index("[")]}') + g_logger.info(f'\t{fp[3][fp[3].index("["):]}') + if fp[1] == 'NCA': + g_logger.info(f'File:\t{fp[fp_len-1]}') + else: + g_logger.info(f'\t{fp[fp_len-1][:fp[fp_len-1].index(".")]}') + g_logger.info(f'Size:\t{g_formattedFileSize:.2f} {g_formattedFileUnit}\n') + case _: # If we ever get here it is time for more work + g_logger.warning(f'\n\tNovel source!!! {fp[1]}???') + g_logger.info:(f'{filename}') + g_logger.info(f'Size:\t{g_formattedFileSize:.2f} {g_formattedFileUnit}\n') + else: # if file is just a file with no parent dirs + if (ext == 'bin'): # Thusfar the only such case + g_logger.info(f'\nType:\tConsole LAFW BLOB or similar (BIN)') + g_logger.info(f'File:\t{filename.rsplit('/', 1)[-1]}') + g_logger.info(f'Size:\t{g_formattedFileSize:.2f} {g_formattedFileUnit}') + else: # If we ever get here it is time for more work + g_logger.warning(f'\n\tNovel source!!!') + g_logger.info:(f'"{filename}"') + else: + (unit,div) = utilsGetSizeUnitAndDivisor(file_size) + fs = file_size / div + if g_extractedFsDumpMode: + fp = filename.split("/") + match fp[1]: + case 'HFS': fn = '/'.join(fp[5:]) + case 'NCA FS': fn = '/'.join(fp[7:]) + case 'atmosphere': fn = '/'.join(fp[5:]) + case _: fn = '/'.join(fp[1:]) + elif g_nspTransferMode: + fn = filename + g_logger.info(f'\t{fn} ({fs:.2f} {unit})') # Perform validity checks. if (not g_nspTransferMode) and file_size and (nsp_header_size >= file_size): @@ -970,10 +1021,12 @@ def usbHandleSendFileProperties(cmd_block: bytes) -> int | None: # Generate full, absolute path to the destination file. fullpath = os.path.abspath(g_outputDir + os.path.sep + filename) - # Unconditionally enable long paths in Windows. - if g_isWindows: - fullpath = utilsGetWinFullPath(fullpath); + printable_fullpath = (fullpath[4:] if g_isWindows else fullpath) + # Unconditionally enable long paths in Windows. + # if g_isWindows: + # fullpath = utilsGetWinFullPath(fullpath); + # Get parent directory path. dirpath = os.path.dirname(fullpath) @@ -983,7 +1036,7 @@ def usbHandleSendFileProperties(cmd_block: bytes) -> int | None: # Make sure the output filepath doesn't point to an existing directory. if os.path.exists(fullpath) and (not os.path.isfile(fullpath)): utilsResetNspInfo() - g_logger.error(f'\nOutput filepath points to an existing directory! ("{fullpath}").\n') + g_logger.error(f'\nOutput filepath points to an existing directory! ("{printable_fullpath}").\n') return USB_STATUS_HOST_IO_ERROR # Make sure we have enough free space. @@ -1011,7 +1064,8 @@ def usbHandleSendFileProperties(cmd_block: bytes) -> int | None: fullpath = g_nspFilePath dirpath = os.path.dirname(fullpath) - + printable_fullpath = (fullpath[4:] if g_isWindows else fullpath) + # Check if we're dealing with an empty file or with the first SendFileProperties command from a NSP. if (not file_size) or (g_nspTransferMode and file_size == g_nspSize): # Close file (if needed). @@ -1024,7 +1078,7 @@ def usbHandleSendFileProperties(cmd_block: bytes) -> int | None: usbSendStatus(USB_STATUS_SUCCESS) # Start data transfer stage. - g_logger.debug(f'Data transfer started. Saving {file_type_str} to: "{fullpath}".') + g_logger.debug(f'Data transfer started. Saving {file_type_str} to: "{printable_fullpath}".') offset = 0 blksize = USB_TRANSFER_BLOCK_SIZE @@ -1134,15 +1188,16 @@ def cancelTransfer(): # Close file handle (if needed); log successful non-constitutent transfer. if not g_nspTransferMode: file.close() - if not g_extractedFsDumpMode: - if not g_logVerbose: - g_logger.info('\n\tTransfer complete!\n') - utilsLogTransferStats(elapsed_time); + if not g_extractedFsDumpMode and not g_logVerbose: + g_logger.info('\nTransfer complete!\n') + utilsLogTransferStats(elapsed_time) + g_logger.info('Your file may be found here:') + g_logger.info(f'{printable_fullpath}\n') return USB_STATUS_SUCCESS def usbHandleCancelFileTransfer(cmd_block: bytes) -> int: - #assert g_logger is not None + assert g_logger is not None g_logger.debug(f'\nReceived CancelFileTransfer ({USB_CMD_START_SESSION:02X}) command.') @@ -1155,10 +1210,10 @@ def usbHandleCancelFileTransfer(cmd_block: bytes) -> int: return USB_STATUS_MALFORMED_CMD def usbHandleSendNspHeader(cmd_block: bytes) -> int: - global g_nspTransferMode, g_nspHeaderSize, g_nspRemainingSize, g_nspFile, g_nspFilePath + global g_nspTransferMode, g_nspHeaderSize, g_nspRemainingSize, g_nspFile, g_nspFilePath, g_isWindows - #assert g_logger is not None - #assert g_nspFile is not None + assert g_logger is not None + assert g_nspFile is not None nsp_header_size = len(cmd_block) @@ -1186,8 +1241,11 @@ def usbHandleSendNspHeader(cmd_block: bytes) -> int: g_logger.debug(f'Successfully wrote 0x{nsp_header_size:X}-byte long NSP header to "{g_nspFilePath}".') if not g_logVerbose: - g_logger.info('\n\tTransfer complete!\n') + g_logger.info('\nTransfer complete!\n') utilsLogTransferStats(time.time() - g_startTime) + g_logger.info('Your file may be found here:') + g_nspFilePath = utilsStripWinPrefix(g_nspFilePath) #if g_isWindows else g_nspFilePath + g_logger.info(f'{g_nspFilePath}\n') # Disable NSP transfer mode. utilsResetNspInfo() @@ -1195,13 +1253,13 @@ def usbHandleSendNspHeader(cmd_block: bytes) -> int: return USB_STATUS_SUCCESS def usbHandleEndSession(cmd_block: bytes) -> int: - #assert g_logger is not None + assert g_logger is not None g_logger.debug(f'\nReceived EndSession ({USB_CMD_END_SESSION:02X}) command.') return USB_STATUS_SUCCESS def usbHandleStartExtractedFsDump(cmd_block: bytes) -> int: - #assert g_logger is not None - global g_extractedFsDumpMode, g_formattedFileSize, g_formattedFileUnit, g_fileSizeMiB, g_startTime + assert g_logger is not None + global g_isWindows, g_outputDir, g_extractedFsDumpMode, g_extractedFsAbsRoot, g_formattedFileSize, g_formattedFileUnit, g_fileSizeMiB, g_startTime g_logger.debug(f'\nReceived StartExtractedFsDump ({USB_CMD_START_EXTRACTED_FS_DUMP:02X}) command.') @@ -1214,25 +1272,38 @@ def usbHandleStartExtractedFsDump(cmd_block: bytes) -> int: extracted_fs_root_path = extracted_fs_root_path.decode('utf-8').strip('\x00') utilsInitTransferVars(extracted_fs_size) - path_array = extracted_fs_root_path.split('/') + fp = extracted_fs_root_path.split('/') + # Non-verbose/debug logging: if not g_logVerbose: - match path_array[1]: + g_logger.info(f'Extracted FS dump started!') + match fp[1]: case 'HFS': - g_logger.info(f'\tExtracted FS dump from HFS (Gamecard) started!') - g_logger.info(f'\nSrc:\t'+'/'.join(path_array[3:])) + g_logger.info(f'\nType:\t{fp[1]} ({fp[2]})') + g_logger.info(f'Src:\t{fp[3][:fp[3].index("[")]}') + g_logger.info(f'\t{fp[3][fp[3].index("["):]}') + g_logger.info(f'\t{fp[4]}') case 'NCA FS': - g_logger.info(f'\tExtracted FS dump from NCA FS (NSP) started!') - g_logger.info(f'\nSrc:\t{path_array[4]}') - g_logger.info(f'\t{path_array[5]}, FS section #{path_array[6]}') + g_logger.info(f'\nType:\t{fp[1]} ({fp[2]} {fp[3]})') + g_logger.info(f'Src:\t{fp[4][:fp[4].index("[")]}') + g_logger.info(f'\t{fp[4][fp[4].index("["):]}') + g_logger.info(f'\t{fp[5]}, FS section #{fp[6]}') + case 'atmosphere': + g_logger.info(f'\nType:\tLayeredFS ('+('/'.join(fp[1:3]))+')') + g_logger.info(f'Src:\t{fp[3]}') + g_logger.info(f'\t{fp[4]}') case _: - g_logger.info(f'\tExtracted FS dump from novel source (???) started!') - g_logger.info:(f'\nRoot:\t{extracted_fs_root_path}') + g_logger.warning(f'\n\tNovel source!!! {fp[1]}???\n') + g_logger.info:(f'\nRoot:\t"{extracted_fs_root_path}"') + g_logger.info(f'Size:\t{g_formattedFileSize:.2f} {g_formattedFileUnit}') - g_logger.info(f'Files:') + g_logger.info(f'Contents:') else: g_logger.debug(f'Starting extracted FS dump (size 0x{extracted_fs_size:X}, output relative path "{extracted_fs_root_path}").') - + + #g_extractedFsAbsRoot = extracted_fs_root_path + g_extractedFsAbsRoot = os.path.abspath(g_outputDir + os.path.sep + extracted_fs_root_path) + g_extractedFsAbsRoot = utilsStripWinPrefix(g_extractedFsAbsRoot) #if g_isWindows else g_extractedFsAbsRoot g_extractedFsDumpMode = True g_startTime = time.time() @@ -1240,17 +1311,20 @@ def usbHandleStartExtractedFsDump(cmd_block: bytes) -> int: return USB_STATUS_SUCCESS def usbHandleEndExtractedFsDump(cmd_block: bytes) -> int: - global g_extractedFsDumpMode - #assert g_logger is not None - g_extractedFsDumpMode = False + global g_extractedFsDumpMode, g_extractedFsAbsRoot + assert g_logger is not None g_logger.debug(f'\nReceived EndExtractedFsDump ({USB_CMD_END_EXTRACTED_FS_DUMP:02X}) command.') if not g_logVerbose: - g_logger.info(f'\n\tExtracted FS dump complete!\n') + g_logger.info(f'\nExtracted FS dump complete!\n') utilsLogTransferStats(time.time() - g_startTime) + g_logger.info(f'Your files may be found here:') + g_logger.info(f'{g_extractedFsAbsRoot}{g_pathSep}\n') + g_extractedFsDumpMode = False + g_extractedFsAbsRoot = "" return USB_STATUS_SUCCESS def usbCommandHandler() -> None: - #assert g_logger is not None + assert g_logger is not None cmd_dict = { USB_CMD_START_SESSION: usbHandleStartSession, @@ -1271,8 +1345,8 @@ def usbCommandHandler() -> None: if not g_cliMode: # Update UI. - #assert g_tkCanvas is not None - #assert g_tkServerButton is not None + assert g_tkCanvas is not None + assert g_tkServerButton is not None g_tkCanvas.itemconfigure(g_tkTipMessage, state='normal', text=SERVER_STOP_MSG) g_tkServerButton.configure(state='disabled') @@ -1340,7 +1414,7 @@ def usbCommandHandler() -> None: def uiStopServer() -> None: # Signal the shared stop event. - #assert g_stopEvent is not None + assert g_stopEvent is not None g_stopEvent.set() # Log the end of the session. g_logger.info("Server stopped.\n") @@ -1348,27 +1422,28 @@ def uiStopServer() -> None: def uiStartServer() -> None: + uiUpdateOutputDir() # Set new log path for this session if logging to file is turned on. if g_logToFile: utilsUpdateLogPath() - + # Update UI. uiToggleElements(False) # Log basic info about the script and settings. - utilsLogBasicInfo() + utilsLogBasicScriptInfo() # Create background server thread. server_thread = threading.Thread(target=usbCommandHandler, daemon=True) server_thread.start() def uiToggleElements(flag: bool) -> None: - #assert g_tkRoot is not None - #assert g_tkChooseDirButton is not None - #assert g_tkServerButton is not None - #assert g_tkCanvas is not None - #assert g_tkLogToFileCheckbox is not None - #assert g_tkVerboseCheckbox is not None + assert g_tkRoot is not None + assert g_tkChooseDirButton is not None + assert g_tkServerButton is not None + assert g_tkCanvas is not None + assert g_tkLogToFileCheckbox is not None + assert g_tkVerboseCheckbox is not None if flag: g_tkRoot.protocol('WM_DELETE_WINDOW', uiHandleExitProtocol) @@ -1379,7 +1454,7 @@ def uiToggleElements(flag: bool) -> None: g_tkLogToFileCheckbox.configure(state='normal') g_tkVerboseCheckbox.configure(state='normal') else: - #assert g_tkScrolledTextLog is not None + assert g_tkScrolledTextLog is not None g_tkRoot.protocol('WM_DELETE_WINDOW', uiHandleExitProtocolStub) @@ -1394,19 +1469,19 @@ def uiToggleElements(flag: bool) -> None: g_tkVerboseCheckbox.configure(state='disabled') def uiChooseDirectory() -> None: - #assert g_tkDirText is not None + assert g_tkDirText is not None dirtext = g_tkDirText.get('1.0', tk.END).strip() - initdir = dirtext if os.path.exists(dirtext) else INITIAL_DIR - + initdir = dirtext if os.path.exists(dirtext) else INITIAL_DIR + # Using \\?\ longfile syntax for Windows will not work for tkinter.filedialog.askdirectory's initialdir parameter. + # User must choose a directory considered the 'normal' length on their system if using the UI. dir = filedialog.askdirectory(parent=g_tkRoot, title='Select an output directory', initialdir=initdir, mustexist=True) if dir: uiUpdateDirectoryField(os.path.abspath(dir)) - uiUpdateOutputDir() def uiUpdateDirectoryField(path: str) -> None: - #assert g_tkDirText is not None + assert g_tkDirText is not None g_tkDirText.configure(state='normal') g_tkDirText.delete('1.0', tk.END) g_tkDirText.insert('1.0', path) @@ -1415,9 +1490,11 @@ def uiUpdateDirectoryField(path: str) -> None: def uiUpdateOutputDir() -> None: global g_outputDir - #assert g_tkDirText is not None + assert g_tkDirText is not None - g_outputDir = g_tkDirText.get('1.0', tk.END).strip() + g_outputDir = utilsGetWinFullPath(g_tkDirText.get('1.0', tk.END).strip()) + # g_outputDir = utilsGetWinFullPath(g_outputDir) if g_isWindows else g_outputDir + if not g_outputDir: # We should never reach this, honestly. messagebox.showerror('Error', 'You must provide an output directory!', parent=g_tkRoot) @@ -1434,7 +1511,7 @@ def uiUpdateOutputDir() -> None: return def uiHandleExitProtocol() -> None: - #assert g_tkRoot is not None + assert g_tkRoot is not None g_tkRoot.destroy() def uiHandleExitProtocolStub() -> None: @@ -1444,15 +1521,14 @@ def uiScaleMeasure(measure: int) -> int: return round(float(measure) * SCALE) def uiHandleLogToFileCheckbox() -> None: - #assert g_logToFile is not None - #assert g_logToFileBoolVar is not None + assert g_logToFileBoolVar is not None global g_logToFile g_logToFile = g_logToFileBoolVar.get() return def uiHandleVerboseCheckbox() -> None: - #assert g_logger is not None - #assert g_logLevelIntVar is not None + assert g_logger is not None + assert g_logLevelIntVar is not None global g_logVerbose logLevel=g_logLevelIntVar.get() g_logger.setLevel(logLevel) @@ -1600,14 +1676,16 @@ def uiInitialize() -> None: g_tkRoot.mainloop() def cliInitialize() -> None: - global g_progressBarWindow + global g_progressBarWindow, g_outputDir, g_logToFile - #assert g_logger is not None + # Unconditionally enable long paths on Windows. + g_outputDir = utilsGetWinFullPath(g_outputDir) #if g_isWindows else g_outputDir - # determines whether to use colors in terminal and sets up accordingly + # Determines whether to use colors in terminal and sets up accordingly. utilsSetupTerminal() - # Set log path if logging to file specified at cmd line + # Set log path if logging to file specified at cmd line. + # NB, g_outputDir should be adjusted for Windows prior. if g_logToFile: utilsUpdateLogPath() @@ -1619,7 +1697,7 @@ def cliInitialize() -> None: g_progressBarWindow = ProgressBarWindow(bar_format) # Log basic info about the script and settings. - utilsLogBasicInfo() + utilsLogBasicScriptInfo() # Start USB command handler directly. usbCommandHandler() @@ -1691,7 +1769,7 @@ def main() -> int: g_logger.info("Host script exited!") if g_isWindows10: print(COLOR_RESET) - except Exception as e: + except: utilsLogException(traceback.format_exc()) try: From d6d9699b99c4b3c95e18c5ce733b350ecbbf1da7 Mon Sep 17 00:00:00 2001 From: bilditup1 Date: Mon, 4 Dec 2023 17:35:25 -0500 Subject: [PATCH 07/10] match upstream var dec --- host/nxdt_host.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/host/nxdt_host.py b/host/nxdt_host.py index ac851a8e..93d066a6 100644 --- a/host/nxdt_host.py +++ b/host/nxdt_host.py @@ -321,8 +321,8 @@ g_logPath: str = '' g_pathSep: str = '' -g_logLevelIntVar: Optional[tk.IntVar] = None -g_logToFileBoolVar: Optional[tk.BooleanVar] = None +g_logLevelIntVar: tk.IntVar | None = None +g_logToFileBoolVar: tk.BooleanVar | None = None g_osType: str = '' g_osVersion: str = '' @@ -367,7 +367,6 @@ g_nspFilePath: str = '' g_extractedFsDumpMode: bool = False -g_layeredFsDumpMode: bool = False g_formattedFileSize: float = 0 g_fileSizeMiB: float = 0 From 7e529298a27b504aac342c8db56b60241cdd9d5e Mon Sep 17 00:00:00 2001 From: bilditup1 Date: Mon, 4 Dec 2023 18:29:46 -0500 Subject: [PATCH 08/10] Minor whitespace/fmting --- host/nxdt_host.py | 21 +++++++++++---------- 1 file changed, 11 insertions(+), 10 deletions(-) diff --git a/host/nxdt_host.py b/host/nxdt_host.py index 93d066a6..8738aab2 100644 --- a/host/nxdt_host.py +++ b/host/nxdt_host.py @@ -434,7 +434,7 @@ def log_to_file (self, msg: str) -> None: # Reference: https://beenje.github.io/blog/posts/logging-to-a-tkinter-scrolledtext-widget. class LogConsole: - def __init__(self, scrolled_text: Optional[scrolledtext.ScrolledText] = None): + def __init__(self, scrolled_text: scrolledtext.ScrolledText | None = None) -> None: assert g_logger is not None self.scrolled_text = scrolled_text @@ -838,7 +838,6 @@ def usbGetDeviceEndpoints() -> bool: return True def usbRead(size: int, timeout: int = -1) -> bytes: - rd = b'' try: @@ -848,12 +847,11 @@ def usbRead(size: int, timeout: int = -1) -> bytes: if not g_cliMode: utilsLogException(traceback.format_exc()) if g_logger is not None: - g_logger.error('\nUSB timeout triggered or console disconnected.\n') + g_logger.error('\nUSB timeout triggered or console disconnected.') return rd def usbWrite(data: bytes, timeout: int = -1) -> int: - wr = 0 try: @@ -862,7 +860,7 @@ def usbWrite(data: bytes, timeout: int = -1) -> int: if not g_cliMode: utilsLogException(traceback.format_exc()) if g_logger is not None: - g_logger.error('\nUSB timeout triggered or console disconnected.\n') + g_logger.error('\nUSB timeout triggered or console disconnected.') return wr @@ -875,6 +873,9 @@ def usbHandleStartSession(cmd_block: bytes) -> int: assert g_logger is not None + if g_cliMode: + print() + g_logger.debug(f'\nReceived StartSession ({USB_CMD_START_SESSION:02X}) command.') # Parse command block. @@ -1064,7 +1065,7 @@ def usbHandleSendFileProperties(cmd_block: bytes) -> int | None: dirpath = os.path.dirname(fullpath) printable_fullpath = (fullpath[4:] if g_isWindows else fullpath) - + # Check if we're dealing with an empty file or with the first SendFileProperties command from a NSP. if (not file_size) or (g_nspTransferMode and file_size == g_nspSize): # Close file (if needed). @@ -1253,14 +1254,14 @@ def usbHandleSendNspHeader(cmd_block: bytes) -> int: def usbHandleEndSession(cmd_block: bytes) -> int: assert g_logger is not None - g_logger.debug(f'\nReceived EndSession ({USB_CMD_END_SESSION:02X}) command.') + g_logger.debug(f'Received EndSession ({USB_CMD_END_SESSION:02X}) command.') return USB_STATUS_SUCCESS def usbHandleStartExtractedFsDump(cmd_block: bytes) -> int: assert g_logger is not None global g_isWindows, g_outputDir, g_extractedFsDumpMode, g_extractedFsAbsRoot, g_formattedFileSize, g_formattedFileUnit, g_fileSizeMiB, g_startTime - g_logger.debug(f'\nReceived StartExtractedFsDump ({USB_CMD_START_EXTRACTED_FS_DUMP:02X}) command.') + g_logger.debug(f'Received StartExtractedFsDump ({USB_CMD_START_EXTRACTED_FS_DUMP:02X}) command.') if g_nspTransferMode: g_logger.error('\nStartExtractedFsDump received mid NSP transfer.\n') @@ -1300,7 +1301,6 @@ def usbHandleStartExtractedFsDump(cmd_block: bytes) -> int: else: g_logger.debug(f'Starting extracted FS dump (size 0x{extracted_fs_size:X}, output relative path "{extracted_fs_root_path}").') - #g_extractedFsAbsRoot = extracted_fs_root_path g_extractedFsAbsRoot = os.path.abspath(g_outputDir + os.path.sep + extracted_fs_root_path) g_extractedFsAbsRoot = utilsStripWinPrefix(g_extractedFsAbsRoot) #if g_isWindows else g_extractedFsAbsRoot g_extractedFsDumpMode = True @@ -1312,7 +1312,8 @@ def usbHandleStartExtractedFsDump(cmd_block: bytes) -> int: def usbHandleEndExtractedFsDump(cmd_block: bytes) -> int: global g_extractedFsDumpMode, g_extractedFsAbsRoot assert g_logger is not None - g_logger.debug(f'\nReceived EndExtractedFsDump ({USB_CMD_END_EXTRACTED_FS_DUMP:02X}) command.') + g_logger.debug(f'Received EndExtractedFsDump ({USB_CMD_END_EXTRACTED_FS_DUMP:02X}) command.') + g_logger.info(f'Finished extracted FS dump.') if not g_logVerbose: g_logger.info(f'\nExtracted FS dump complete!\n') utilsLogTransferStats(time.time() - g_startTime) From d8739fc514c0fb1a5d26e9bb163365b71cbf225d Mon Sep 17 00:00:00 2001 From: bilditup1 Date: Mon, 4 Dec 2023 23:46:41 -0500 Subject: [PATCH 09/10] readd changes, made one line work in py3.11, otherwise minor --- host/nxdt_host.py | 86 +++++++++++++++++++---------------------------- 1 file changed, 34 insertions(+), 52 deletions(-) diff --git a/host/nxdt_host.py b/host/nxdt_host.py index c5b61993..28ec3766 100644 --- a/host/nxdt_host.py +++ b/host/nxdt_host.py @@ -55,7 +55,7 @@ from tqdm import tqdm -from argparse import ArgumentParser, RawTextHelpFormatter, ArgumentDefaultsHelpFormatter +from argparse import ArgumentParser, RawTextHelpFormatter from io import BufferedWriter from typing import Generator, Any, Callable @@ -320,6 +320,7 @@ g_outputDir: str = '' g_logLevelIntVar: tk.IntVar | None = None g_logToFileBoolVar: tk.BooleanVar | None = None + g_logPath: str = '' g_pathSep: str = '' @@ -632,6 +633,9 @@ def utilsLogException(exception_str: str) -> None: if (not g_cliMode) and (g_logger is not None): g_logger.debug(exception_str) +def utilsGetExt(path_arg: str) -> str: + return os.path.splitext(path_arg)[1][1:] + def utilsGetPath(path_arg: str, fallback_path: str, is_file: bool, create: bool = False) -> str: path = os.path.abspath(os.path.expanduser(os.path.expandvars(path_arg if path_arg else fallback_path))) @@ -773,7 +777,7 @@ def usbGetDeviceEndpoints() -> bool: global g_usbEpIn, g_usbEpOut, g_usbEpMaxPacketSize, g_usbVer assert g_logger is not None - assert g_stopEvent is not None + #assert g_stopEvent is not None cur_dev: Generator[usb.core.Device, Any, None] | None = None prev_dev: usb.core.Device | None = None @@ -944,7 +948,7 @@ def usbHandleSendFileProperties(cmd_block: bytes) -> int | None: if not g_nspTransferMode and not g_extractedFsDumpMode: fp = filename.split('/') # fp[0] is '/' character fp_len = len(fp) - ext = filename[-3:] + ext = utilsGetExt(filename) utilsInitTransferVars(file_size) g_logger.info("\nTransfer started!") if (fp_len >= 2): # if file has parent directories @@ -973,7 +977,7 @@ def usbHandleSendFileProperties(cmd_block: bytes) -> int | None: g_logger.info(f'\t{fp[4][:fp[4].index(".")]}') g_logger.info(f'Size:\t{g_formattedFileSize:.2f} {g_formattedFileUnit}\n') case 'NCA' | 'HFS': - printable_ext = fp[fp_len-1][fp[fp_len-1].index(".")+1:] if fp[1] == 'HFS' else ext + printable_ext = ext#fp[fp_len-1][fp[fp_len-1].index(".")+1:] if fp[1] == 'HFS' else ext g_logger.info(f'\nType:\t{fp[1]} ({fp[2]}) [{printable_ext.upper()}]') g_logger.info(f'Src:\t{fp[3][:fp[3].index("[")]}') g_logger.info(f'\t{fp[3][fp[3].index("["):]}') @@ -986,10 +990,10 @@ def usbHandleSendFileProperties(cmd_block: bytes) -> int | None: g_logger.warning(f'\n\tNovel source!!! {fp[1]}???') g_logger.info:(f'{filename}') g_logger.info(f'Size:\t{g_formattedFileSize:.2f} {g_formattedFileUnit}\n') - else: # if file is just a file with no parent dirs + else: # If file is just a file with no parent dirs if (ext == 'bin'): # Thusfar the only such case g_logger.info(f'\nType:\tConsole LAFW BLOB or similar (BIN)') - g_logger.info(f'File:\t{filename.rsplit('/', 1)[-1]}') + g_logger.info(f'File:\t'+filename.rsplit('/', 1)[-1]) g_logger.info(f'Size:\t{g_formattedFileSize:.2f} {g_formattedFileUnit}') else: # If we ever get here it is time for more work g_logger.warning(f'\n\tNovel source!!!') @@ -1035,13 +1039,7 @@ def usbHandleSendFileProperties(cmd_block: bytes) -> int | None: if (not g_nspTransferMode) or (g_nspFile is None): # Generate full, absolute path to the destination file. fullpath = os.path.abspath(g_outputDir + os.path.sep + filename) - printable_fullpath = (fullpath[4:] if g_isWindows else fullpath) - - printable_fullpath = (fullpath[4:] if g_isWindows else fullpath) - - # Unconditionally enable long paths in Windows. - # if g_isWindows: - # fullpath = utilsGetWinFullPath(fullpath); + printable_fullpath = utilsStripWinPrefix(fullpath) # Get parent directory path. dirpath = os.path.dirname(fullpath) @@ -1080,7 +1078,7 @@ def usbHandleSendFileProperties(cmd_block: bytes) -> int | None: fullpath = g_nspFilePath dirpath = os.path.dirname(fullpath) - printable_fullpath = (fullpath[4:] if g_isWindows else fullpath) + printable_fullpath = utilsStripWinPrefix(fullpath) # Check if we're dealing with an empty file or with the first SendFileProperties command from a NSP. if (not file_size) or (g_nspTransferMode and file_size == g_nspSize): @@ -1277,6 +1275,7 @@ def usbHandleEndSession(cmd_block: bytes) -> int: def usbHandleStartExtractedFsDump(cmd_block: bytes) -> int: assert g_logger is not None + global g_isWindows, g_outputDir, g_extractedFsDumpMode, g_extractedFsAbsRoot, g_formattedFileSize, g_formattedFileUnit, g_fileSizeMiB, g_startTime g_logger.debug(f'Received StartExtractedFsDump ({USB_CMD_START_EXTRACTED_FS_DUMP:02X}) command.') @@ -1318,8 +1317,7 @@ def usbHandleStartExtractedFsDump(cmd_block: bytes) -> int: else: g_logger.debug(f'Starting extracted FS dump (size 0x{extracted_fs_size:X}, output relative path "{extracted_fs_root_path}").') - g_extractedFsAbsRoot = os.path.abspath(g_outputDir + os.path.sep + extracted_fs_root_path) - g_extractedFsAbsRoot = utilsStripWinPrefix(g_extractedFsAbsRoot) #if g_isWindows else g_extractedFsAbsRoot + g_extractedFsAbsRoot = utilsStripWinPrefix(os.path.abspath(g_outputDir + os.path.sep + extracted_fs_root_path)) g_extractedFsDumpMode = True g_startTime = time.time() @@ -1328,8 +1326,9 @@ def usbHandleStartExtractedFsDump(cmd_block: bytes) -> int: def usbHandleEndExtractedFsDump(cmd_block: bytes) -> int: assert g_logger is not None + global g_extractedFsDumpMode, g_extractedFsAbsRoot g_logger.debug(f'Received EndExtractedFsDump ({USB_CMD_END_EXTRACTED_FS_DUMP:02X}) command.') - g_logger.info(f'Finished extracted FS dump.') + g_logger.debug(f'\nFinished extracted FS dump.') if not g_logVerbose: g_logger.info(f'\nExtracted FS dump complete!\n') utilsLogTransferStats(time.time() - g_startTime) @@ -1438,25 +1437,10 @@ def uiStopServer() -> None: def uiStartServer() -> None: - assert g_tkDirText is not None - - g_outputDir = g_tkDirText.get('1.0', tk.END).strip() - if not g_outputDir: - # We should never reach this, honestly. - messagebox.showerror('Error', 'You must provide an output directory!', parent=g_tkRoot) - return - - # Unconditionally enable 32-bit paths on Windows. - if g_isWindows: - g_outputDir = '\\\\?\\' + g_outputDir - - # Make sure the full directory tree exists. - try: - os.makedirs(g_outputDir, exist_ok=True) - except: - utilsLogException(traceback.format_exc()) - messagebox.showerror('Error', 'Unable to create full output directory tree!', parent=g_tkRoot) - return + uiUpdateOutputDir() + # Set new log path for this session if logging to file is turned on. + if g_logToFile: + utilsUpdateLogPath() # Update UI. uiToggleElements(False) @@ -1473,6 +1457,7 @@ def uiToggleElements(flag: bool) -> None: assert g_tkChooseDirButton is not None assert g_tkServerButton is not None assert g_tkCanvas is not None + assert g_tkLogToFileCheckbox is not None assert g_tkVerboseCheckbox is not None if flag: @@ -1554,12 +1539,14 @@ def uiHandleLogToFileCheckbox() -> None: assert g_logToFileBoolVar is not None global g_logToFile g_logToFile = g_logToFileBoolVar.get() - return def uiHandleVerboseCheckbox() -> None: assert g_logger is not None assert g_logLevelIntVar is not None - g_logger.setLevel(g_logLevelIntVar.get()) + global g_logVerbose + logLevel=g_logLevelIntVar.get() + g_logger.setLevel(logLevel) + g_logVerbose = True if(logLevel == logging.DEBUG) else False def uiInitialize() -> None: global SCALE, g_logLevelIntVar, g_logToFileBoolVar, g_logToFile, g_logVerbose @@ -1703,11 +1690,10 @@ def uiInitialize() -> None: def cliInitialize() -> None: global g_progressBarWindow, g_outputDir, g_logToFile - assert g_logger is not None # Unconditionally enable long paths on Windows. - g_outputDir = utilsGetWinFullPath(g_outputDir) #if g_isWindows else g_outputDir + g_outputDir = utilsGetWinFullPath(g_outputDir) # Determines whether to use colors in terminal and sets up accordingly. utilsSetupTerminal() @@ -1723,13 +1709,8 @@ def cliInitialize() -> None: bar_format = '{percentage:.2f}% |{bar}| {n:.2f}/{total:.2f} [{elapsed}<{remaining}, {rate_fmt}]' g_progressBarWindow = ProgressBarWindow(bar_format) - # Print info. - g_logger.info(f'\n{SCRIPT_TITLE}. {COPYRIGHT_TEXT}.') - g_logger.info(f'Output directory: "{g_outputDir}".\n') - - # Unconditionally enable 32-bit paths on Windows. - if g_isWindows: - g_outputDir = '\\\\?\\' + g_outputDir + # Log basic info about the script and settings. + utilsLogBasicScriptInfo() # Start USB command handler directly. usbCommandHandler() @@ -1739,11 +1720,11 @@ def main() -> int: # Disable warnings. warnings.filterwarnings("ignore") - - # Parse command line arguments. - parser = ArgumentParser(description=SCRIPT_TITLE + '. ' + COPYRIGHT_TEXT + '.') + + parser = ArgumentParser(formatter_class=RawTextHelpFormatter, description=SCRIPT_TITLE + '. ' + COPYRIGHT_TEXT + '.') parser.add_argument('-c', '--cli', required=False, action='store_true', default=False, help='Start the script in CLI mode.') - parser.add_argument('-o', '--outdir', required=False, type=str, metavar='DIR', help=f'Path to output directory. Defaults to "{DEFAULT_DIR}".') + parser.add_argument('-o', '--outdir', required=False, type=str, metavar='DIR', help='Path to output directory; will attempt to create if non-existent.\nDefaults to "' + DEFAULT_DIR + '".') + parser.add_argument('-l', '--log', required=False, action='store_true', default=False, help='Enables logging to file in output directory in CLI mode.') parser.add_argument('-v', '--verbose', required=False, action='store_true', default=False, help='Enable verbose output.') args = parser.parse_args() @@ -1797,7 +1778,8 @@ def main() -> int: ret = main() except KeyboardInterrupt: time.sleep(0.2) - print('\nScript interrupted.') + g_logger.info("Host script exited!") + if g_isWindows10: print(COLOR_RESET) except: utilsLogException(traceback.format_exc()) From 360ebc6245c15e42f78eb851634977f72a31adce Mon Sep 17 00:00:00 2001 From: bilditup1 Date: Fri, 8 Dec 2023 23:44:03 -0500 Subject: [PATCH 10/10] added file counter; win32 longfile mod reverted to 'just-in-time' --- host/nxdt_host.py | 138 +++++++++++++++++++++++++--------------------- 1 file changed, 74 insertions(+), 64 deletions(-) diff --git a/host/nxdt_host.py b/host/nxdt_host.py index 28ec3766..40a23bdf 100644 --- a/host/nxdt_host.py +++ b/host/nxdt_host.py @@ -340,6 +340,7 @@ g_tkTipMessage: int = 0 g_tkScrolledTextLog: scrolledtext.ScrolledText | None = None g_tkVerboseCheckbox: tk.Checkbutton | None = None +g_tkLogToFileCheckbox: tk.Checkbutton | None = None g_logger: logging.Logger | None = None @@ -544,7 +545,6 @@ def start(self, total: int, n: int = 0, divider: int = 1, prefix: str = '', unit self.unit = unit if self.tk_pbar: - #print() self.tk_pbar.configure(maximum=self.total_div, mode='determinate') self.start_time = time.time() else: @@ -633,12 +633,13 @@ def utilsLogException(exception_str: str) -> None: if (not g_cliMode) and (g_logger is not None): g_logger.debug(exception_str) +# Returns the extension of the given file without leading dot def utilsGetExt(path_arg: str) -> str: return os.path.splitext(path_arg)[1][1:] def utilsGetPath(path_arg: str, fallback_path: str, is_file: bool, create: bool = False) -> str: path = os.path.abspath(os.path.expanduser(os.path.expandvars(path_arg if path_arg else fallback_path))) - + if not is_file and create: os.makedirs(path, exist_ok=True) @@ -647,18 +648,23 @@ def utilsGetPath(path_arg: str, fallback_path: str, is_file: bool, create: bool return path +# MSYS2 uses regular slashes by default, so we unconditionally +# change this to backslashes for consistency +def utilsGetWinSeparatedPath(path_arg: str) -> str: + global g_isWindows + return path_arg.replace('/', '\\') if g_isWindows else path_arg + # Prepends `\\?\` to enable ~64KiB long paths in Windows. # ref0: https://learn.microsoft.com/en-us/windows/win32/fileio/naming-a-file?redirectedfrom=MSDN#win32-file-namespaces # ref1: https://learn.microsoft.com/en-us/windows/win32/fileio/maximum-file-path-limitation?tabs=registry # ref2: https://stackoverflow.com/a/15373771 -# Also replaces '/' separator with proper '\' in case running under MSYS2 env. - +# Also replaces '/' separator in case running under MSYS2 env. def utilsGetWinFullPath(path_arg: str) -> str: global g_isWindows - return ('\\\\?\\' + path_arg.replace("/", "\\")) if g_isWindows else path_arg + return ('\\\\?\\' + utilsGetWinSeparatedPath(path_arg)) if g_isWindows else path_arg # Strips the preceding prefix when we want to print - +# Does /not/ actually check if it's there, do not call if prefix absent! def utilsStripWinPrefix(path_arg: str) -> str: global g_isWindows return path_arg[4:] if g_isWindows else path_arg @@ -666,12 +672,10 @@ def utilsStripWinPrefix(path_arg: str) -> str: # Updates the path of the log def utilsUpdateLogPath() -> None: global g_logPath - g_logPath = os.path.abspath(g_outputDir + os.path.sep + \ - "nxdt_host_" + datetime.now().strftime('%Y-%m-%d_%H%M%S') + '.log') - return + g_logPath = utilsGetWinFullPath(os.path.abspath(g_outputDir + os.path.sep + \ + "nxdt_host_" + datetime.now().strftime('%Y-%m-%d_%H%M%S') + '.log')) # Enable terminal colors on *nix and supported Windows (10.0.10586+) - def utilsSetupTerminal() -> None: global g_terminalColors @@ -703,11 +707,8 @@ def utilsLogBasicScriptInfo() -> None: g_logger.info('\nServer started...\n') g_logger.info('Sys:\tPython ' + platform.python_version() + " on "+ g_osType+" "+g_osVersion) - # if g_isWindows: - # g_logger.info('Dst:\t' + utilsStripWinPrefix(g_outputDir)) - # else: - # g_logger.info('Dst:\t' + g_outputDir) - g_logger.info('Dst:\t' + utilsStripWinPrefix(g_outputDir)) + # Forward slashes changed to back in case running inside MSYS2 + g_logger.info('Dst:\t' + utilsGetWinSeparatedPath(g_outputDir)) if g_logToFile: g_logger.info('Log:\t' + g_logPath.rsplit(g_pathSep, 1)[-1]) @@ -719,14 +720,15 @@ def utilsLogBasicScriptInfo() -> None: else: g_logger.info('Verbose logging is disabled.\n') - return - # On successful transfer, log elapsed time and (within reason) average transfer speed -def utilsLogTransferStats(elapsed_time: float) -> None: +def utilsLogTransferStats(elapsed_time: float, file_counter = 0) -> None: global g_formattedFileSize, g_formattedFileUnit, g_fileSizeMiB if g_formattedFileUnit != "B": formatted_time = f'{elapsed_time:.2f}s' if round(elapsed_time < 60) else tqdm.format_interval(elapsed_time) - g_logger.info(f'{g_formattedFileSize:.2f} {g_formattedFileUnit} transferred in {formatted_time}.') + formatted_info = f'{g_formattedFileSize:.2f} {g_formattedFileUnit}' + formatted_info += f' in {file_counter} files' if file_counter > 1 else ' in one file' + formatted_info += f' transferred in {formatted_time}.' + g_logger.info(formatted_info) if elapsed_time > float(1): g_logger.info(f'Avg speed: {g_fileSizeMiB/elapsed_time:.2f} MiB/s\n') else: @@ -736,7 +738,7 @@ def utilsIsValueAlignedToEndpointPacketSize(value: int) -> bool: return bool((value & (g_usbEpMaxPacketSize - 1)) == 0) def utilsResetNspInfo(delete: bool = False) -> None: - global g_nspTransferMode, g_nspSize, g_nspHeaderSize, g_nspRemainingSize, g_nspFile, g_nspFilePath + global g_nspTransferMode, g_nspSize, g_nspHeaderSize, g_nspRemainingSize, g_nspFile, g_nspFilePath, g_fileCounter if g_nspFile: g_nspFile.close() @@ -750,6 +752,8 @@ def utilsResetNspInfo(delete: bool = False) -> None: g_nspRemainingSize = 0 g_nspFile = None g_nspFilePath = '' + + g_fileCounter = 0 def utilsGetSizeUnitAndDivisor(size: int) -> tuple[str, int]: size_suffixes = [ 'B', 'KiB', 'MiB', 'GiB' ] @@ -777,7 +781,7 @@ def usbGetDeviceEndpoints() -> bool: global g_usbEpIn, g_usbEpOut, g_usbEpMaxPacketSize, g_usbVer assert g_logger is not None - #assert g_stopEvent is not None + #assert g_stopEvent is not None cur_dev: Generator[usb.core.Device, Any, None] | None = None prev_dev: usb.core.Device | None = None @@ -891,9 +895,6 @@ def usbHandleStartSession(cmd_block: bytes) -> int: global g_nxdtVersionMajor, g_nxdtVersionMinor, g_nxdtVersionMicro, g_nxdtAbiVersionMajor, g_nxdtAbiVersionMinor, g_nxdtGitCommit assert g_logger is not None - - if g_cliMode: - print() g_logger.debug(f'\nReceived StartSession ({USB_CMD_START_SESSION:02X}) command.') @@ -924,7 +925,7 @@ def usbHandleStartSession(cmd_block: bytes) -> int: def usbHandleSendFileProperties(cmd_block: bytes) -> int | None: global g_nspTransferMode, g_nspSize, g_nspHeaderSize, g_nspRemainingSize, g_nspFile, g_nspFilePath, g_outputDir, g_tkRoot, g_progressBarWindow - global g_formattedFileSize, g_formattedFileUnit, g_fileSizeMiB, g_startTime + global g_formattedFileSize, g_formattedFileUnit, g_fileSizeMiB, g_startTime, g_fileCounter assert g_logger is not None assert g_progressBarWindow is not None @@ -948,7 +949,7 @@ def usbHandleSendFileProperties(cmd_block: bytes) -> int | None: if not g_nspTransferMode and not g_extractedFsDumpMode: fp = filename.split('/') # fp[0] is '/' character fp_len = len(fp) - ext = utilsGetExt(filename) + ext = utilsGetExt(filename) ##fp[fp_len-1][fp[fp_len-1].index(".")+1:] utilsInitTransferVars(file_size) g_logger.info("\nTransfer started!") if (fp_len >= 2): # if file has parent directories @@ -977,8 +978,7 @@ def usbHandleSendFileProperties(cmd_block: bytes) -> int | None: g_logger.info(f'\t{fp[4][:fp[4].index(".")]}') g_logger.info(f'Size:\t{g_formattedFileSize:.2f} {g_formattedFileUnit}\n') case 'NCA' | 'HFS': - printable_ext = ext#fp[fp_len-1][fp[fp_len-1].index(".")+1:] if fp[1] == 'HFS' else ext - g_logger.info(f'\nType:\t{fp[1]} ({fp[2]}) [{printable_ext.upper()}]') + g_logger.info(f'\nType:\t{fp[1]} ({fp[2]}) [{ext.upper()}]') g_logger.info(f'Src:\t{fp[3][:fp[3].index("[")]}') g_logger.info(f'\t{fp[3][fp[3].index("["):]}') if fp[1] == 'NCA': @@ -999,6 +999,7 @@ def usbHandleSendFileProperties(cmd_block: bytes) -> int | None: g_logger.warning(f'\n\tNovel source!!!') g_logger.info:(f'"{filename}"') else: + g_fileCounter += 1 (unit,div) = utilsGetSizeUnitAndDivisor(file_size) fs = file_size / div if g_extractedFsDumpMode: @@ -1038,19 +1039,19 @@ def usbHandleSendFileProperties(cmd_block: bytes) -> int | None: # Perform additional validity checks and get a file object to work with. if (not g_nspTransferMode) or (g_nspFile is None): # Generate full, absolute path to the destination file. - fullpath = os.path.abspath(g_outputDir + os.path.sep + filename) - printable_fullpath = utilsStripWinPrefix(fullpath) + fullpath = utilsGetWinFullPath(os.path.abspath(g_outputDir + os.path.sep + filename)) + #printable_fullpath = utilsStripWinPrefix(fullpath) # Get parent directory path. dirpath = os.path.dirname(fullpath) - + # Create full directory tree. os.makedirs(dirpath, exist_ok=True) # Make sure the output filepath doesn't point to an existing directory. if os.path.exists(fullpath) and (not os.path.isfile(fullpath)): utilsResetNspInfo() - g_logger.error(f'Output filepath points to an existing directory! ("{printable_fullpath}").\n') + g_logger.error(f'Output filepath points to an existing directory! ("{fullpath}").\n') #printable_fullpath return USB_STATUS_HOST_IO_ERROR # Make sure we have enough free space. @@ -1078,7 +1079,7 @@ def usbHandleSendFileProperties(cmd_block: bytes) -> int | None: fullpath = g_nspFilePath dirpath = os.path.dirname(fullpath) - printable_fullpath = utilsStripWinPrefix(fullpath) + #printable_fullpath = utilsStripWinPrefix(fullpath) # Check if we're dealing with an empty file or with the first SendFileProperties command from a NSP. if (not file_size) or (g_nspTransferMode and file_size == g_nspSize): @@ -1092,7 +1093,7 @@ def usbHandleSendFileProperties(cmd_block: bytes) -> int | None: usbSendStatus(USB_STATUS_SUCCESS) # Start data transfer stage. - g_logger.debug(f'Data transfer started. Saving {file_type_str} to: "{printable_fullpath}".') + g_logger.debug(f'Data transfer started. Saving {file_type_str} to: "{fullpath}".') # printable_fullpath offset = 0 blksize = USB_TRANSFER_BLOCK_SIZE @@ -1105,7 +1106,7 @@ def usbHandleSendFileProperties(cmd_block: bytes) -> int | None: prefix = '' else: prefix = f'Current {file_type_str}: "{os.path.basename(filename)}".\n' - prefix += 'Use your console to cancel the file transfer if you wish to do so.' + prefix += 'Hold \'B\' on your console to cancel the file transfer if you wish to do so.' if (not g_nspTransferMode) or g_nspRemainingSize == (g_nspSize - g_nspHeaderSize): if not g_nspTransferMode: @@ -1139,9 +1140,6 @@ def cancelTransfer(): # Start transfer process. start_time = time.time() - # if not g_nspTransferMode: - # g_startTime = start_time - # print(f'1: {g_startTime}') while offset < file_size: # Update block size (if needed). @@ -1208,7 +1206,7 @@ def cancelTransfer(): g_logger.info('\nTransfer complete!\n') utilsLogTransferStats(elapsed_time) g_logger.info('Your file may be found here:') - g_logger.info(f'{printable_fullpath}\n') + g_logger.info(f'{utilsStripWinPrefix(fullpath)}\n') return USB_STATUS_SUCCESS @@ -1226,7 +1224,7 @@ def usbHandleCancelFileTransfer(cmd_block: bytes) -> int: return USB_STATUS_MALFORMED_CMD def usbHandleSendNspHeader(cmd_block: bytes) -> int: - global g_nspTransferMode, g_nspHeaderSize, g_nspRemainingSize, g_nspFile, g_nspFilePath, g_isWindows + global g_nspTransferMode, g_nspHeaderSize, g_nspRemainingSize, g_nspFile, g_nspFilePath, g_isWindows, g_fileCounter assert g_logger is not None assert g_nspFile is not None @@ -1252,16 +1250,14 @@ def usbHandleSendNspHeader(cmd_block: bytes) -> int: g_nspFile.seek(0) g_nspFile.write(cmd_block) - # Log successful NSP transfer (header distinguishes it from constituent NCA transfers) - + # Log successful NSP transfer (header distinguishes it from constituent transfers) g_logger.debug(f'Successfully wrote 0x{nsp_header_size:X}-byte long NSP header to "{g_nspFilePath}".') if not g_logVerbose: g_logger.info('\nTransfer complete!\n') - utilsLogTransferStats(time.time() - g_startTime) + utilsLogTransferStats(time.time() - g_startTime, g_fileCounter) g_logger.info('Your file may be found here:') - g_nspFilePath = utilsStripWinPrefix(g_nspFilePath) #if g_isWindows else g_nspFilePath - g_logger.info(f'{g_nspFilePath}\n') + g_logger.info(f'{utilsStripWinPrefix(g_nspFilePath)}\n') # Disable NSP transfer mode. utilsResetNspInfo() @@ -1275,7 +1271,7 @@ def usbHandleEndSession(cmd_block: bytes) -> int: def usbHandleStartExtractedFsDump(cmd_block: bytes) -> int: assert g_logger is not None - global g_isWindows, g_outputDir, g_extractedFsDumpMode, g_extractedFsAbsRoot, g_formattedFileSize, g_formattedFileUnit, g_fileSizeMiB, g_startTime + global g_isWindows, g_outputDir, g_extractedFsDumpMode, g_extractedFsAbsRoot, g_formattedFileSize, g_formattedFileUnit, g_fileSizeMiB, g_startTime, g_fileCounter g_logger.debug(f'Received StartExtractedFsDump ({USB_CMD_START_EXTRACTED_FS_DUMP:02X}) command.') @@ -1317,8 +1313,9 @@ def usbHandleStartExtractedFsDump(cmd_block: bytes) -> int: else: g_logger.debug(f'Starting extracted FS dump (size 0x{extracted_fs_size:X}, output relative path "{extracted_fs_root_path}").') - g_extractedFsAbsRoot = utilsStripWinPrefix(os.path.abspath(g_outputDir + os.path.sep + extracted_fs_root_path)) + g_extractedFsAbsRoot = utilsGetWinSeparatedPath(os.path.abspath(g_outputDir + os.path.sep + extracted_fs_root_path)) + g_pathSep g_extractedFsDumpMode = True + g_fileCounter = 0 g_startTime = time.time() # Return status code. @@ -1326,16 +1323,17 @@ def usbHandleStartExtractedFsDump(cmd_block: bytes) -> int: def usbHandleEndExtractedFsDump(cmd_block: bytes) -> int: assert g_logger is not None - global g_extractedFsDumpMode, g_extractedFsAbsRoot + global g_extractedFsDumpMode, g_extractedFsAbsRoot, g_fileCounter g_logger.debug(f'Received EndExtractedFsDump ({USB_CMD_END_EXTRACTED_FS_DUMP:02X}) command.') g_logger.debug(f'\nFinished extracted FS dump.') if not g_logVerbose: g_logger.info(f'\nExtracted FS dump complete!\n') - utilsLogTransferStats(time.time() - g_startTime) + utilsLogTransferStats(time.time() - g_startTime, g_fileCounter) g_logger.info(f'Your files may be found here:') - g_logger.info(f'{g_extractedFsAbsRoot}{g_pathSep}\n') + g_logger.info(f'{g_extractedFsAbsRoot}\n') g_extractedFsDumpMode = False g_extractedFsAbsRoot = "" + g_fileCounter = 0 return USB_STATUS_SUCCESS def usbCommandHandler() -> None: @@ -1437,7 +1435,7 @@ def uiStopServer() -> None: def uiStartServer() -> None: - uiUpdateOutputDir() + uiUpdateOutputDirectory() # Set new log path for this session if logging to file is turned on. if g_logToFile: utilsUpdateLogPath() @@ -1484,11 +1482,12 @@ def uiToggleElements(flag: bool) -> None: g_tkVerboseCheckbox.configure(state='disabled') def uiChooseDirectory() -> None: + assert g_tkDirText is not None dirtext = g_tkDirText.get('1.0', tk.END).strip() initdir = dirtext if os.path.exists(dirtext) else INITIAL_DIR # Using \\?\ longfile syntax for Windows will not work for tkinter.filedialog.askdirectory's initialdir parameter. - # User must choose a directory considered the 'normal' length on their system if using the UI. + # Thus, win32 users must choose a directory considered the 'normal' length on their system if using the UI. dir = filedialog.askdirectory(parent=g_tkRoot, title='Select an output directory', initialdir=initdir, mustexist=True) if dir: @@ -1499,16 +1498,18 @@ def uiUpdateDirectoryField(path: str) -> None: assert g_tkDirText is not None g_tkDirText.configure(state='normal') g_tkDirText.delete('1.0', tk.END) - g_tkDirText.insert('1.0', path) + # For consistency, change path separator on Windows under MSYS2, since + # tkinter.filedialog.askdirectory will return a fwd-slashed path in that case. + g_tkDirText.insert('1.0', utilsGetWinSeparatedPath(path)) g_tkDirText.configure(state='disabled') -def uiUpdateOutputDir() -> None: +def uiUpdateOutputDirectory() -> None: global g_outputDir assert g_tkDirText is not None - g_outputDir = utilsGetWinFullPath(g_tkDirText.get('1.0', tk.END).strip()) - # g_outputDir = utilsGetWinFullPath(g_outputDir) if g_isWindows else g_outputDir + # Note: longpaths for Windows are now set at the file rather than dir level + g_outputDir = g_tkDirText.get('1.0', tk.END).strip() if not g_outputDir: # We should never reach this, honestly. @@ -1544,7 +1545,7 @@ def uiHandleVerboseCheckbox() -> None: assert g_logger is not None assert g_logLevelIntVar is not None global g_logVerbose - logLevel=g_logLevelIntVar.get() + logLevel = g_logLevelIntVar.get() g_logger.setLevel(logLevel) g_logVerbose = True if(logLevel == logging.DEBUG) else False @@ -1692,16 +1693,15 @@ def cliInitialize() -> None: global g_progressBarWindow, g_outputDir, g_logToFile assert g_logger is not None - # Unconditionally enable long paths on Windows. - g_outputDir = utilsGetWinFullPath(g_outputDir) + # Note: longpaths for Windows are now set at the file rather than dir level # Determines whether to use colors in terminal and sets up accordingly. utilsSetupTerminal() # Set log path if logging to file specified at cmd line. - # NB, g_outputDir should be adjusted for Windows prior. if g_logToFile: utilsUpdateLogPath() + # Initialize console logger. console = LogConsole() @@ -1729,9 +1729,8 @@ def main() -> int: args = parser.parse_args() - # Update global flags. - g_cliMode = args.cli - g_outputDir = utilsGetPath(args.outdir, DEFAULT_DIR, False, True) + # Update global flags, other than g_outputDir, which should be set only after platform seciton + g_cliMode = args.cli g_logToFile = args.log g_logVerbose = args.verbose @@ -1752,7 +1751,18 @@ def main() -> int: # ANSI colors in cmd.exe min. build # ref: https://github.com/dart-lang/sdk/issues/28614#issuecomment-287282970 g_isWindows10 = (win_ver_major >= 10 and win_build >= 10586) - g_pathSep = os.path.sep if not g_isWindows else '\\' + g_pathSep = '\\' + else: + g_pathSep = os.path.sep + + # utilsGetWinFullPath() enables output directories with longfile names on Windows. + # After creation by utilsGetPath(), the Windows prefix is stripped, as setting + # the long-path on a per-file basis in GUI mode, which uses the same codepath + # for actual file dumping, is more consistent across Windows environments, + # and needs comparatively less management during logging. + # g_outputDir = utilsGetPath(args.outdir, DEFAULT_DIR, False, True) + g_outputDir = utilsStripWinPrefix(utilsGetPath(utilsGetWinFullPath(args.outdir), utilsGetWinFullPath(DEFAULT_DIR), False, True)) + # Setup logging mechanism. logging.basicConfig(level=(logging.DEBUG if g_logVerbose else logging.INFO))