Skip to content

Altered ExecutionList by further grouping [less model reload] #7978

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
176 changes: 124 additions & 52 deletions comfy_execution/graph.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
import os
from __future__ import annotations
from typing import Type, Literal
from typing import Type, Literal, Optional

import nodes
from comfy_execution.graph_utils import is_link
from comfy.comfy_types.node_typing import ComfyNodeABC, InputTypeDict, InputTypeOptions

# Optional debug flag: set `MAGIX_DEBUG=1` in your env to see batch picks.
_ENABLE_MAGIX_LOGS = os.getenv("MAGIX_DEBUG", "0") == "1"

class DependencyCycleError(Exception):
pass

Expand Down Expand Up @@ -170,86 +174,153 @@ def is_empty(self):

class ExecutionList(TopologicalSort):
"""
ExecutionList implements a topological dissolve of the graph. After a node is staged for execution,
it can still be returned to the graph after having further dependencies added.
A topological scheduler that favours executing many nodes of the same
`class_type` back-to-back. The idea is simple:

• Keep a *current group* (= a class_type). ✨
• While at least one ready node of that group exists, pick from it. ✨
• When the group is exhausted, choose the next group that has the ✨
largest number of simultaneously ready nodes. ✨
• Inside a group, keep the original UX heuristics that prioritise
early outputs / previews.

Magix add-on
------------
If the environment variable **MAGIX_DEBUG=1** is set, the scheduler
prints a one-liner each time it switches batches, e.g.:

[Magix] 🎯 Switched batch → 'CLIPTextEncode' (5 ready)

That’s all—no functional changes, no performance tax when disabled.
"""

# ------------------------------------------------------------------
def __init__(self, dynprompt, output_cache):
super().__init__(dynprompt)
self.output_cache = output_cache
self.staged_node_id = None

def is_cached(self, node_id):
return self.output_cache.get(node_id) is not None

self.staged_node_id: Optional[str] = None

# remember which type we are currently batching
self._current_group_class: Optional[str] = None

# ------------------------------------------------------------------
# group selection helpers
# ------------------------------------------------------------------
def _pick_largest_group(self, node_list):
"""Return the class_type that has most representatives in `node_list`."""
counts = {}
for nid in node_list:
ctype = self.dynprompt.get_node(nid)["class_type"]
counts[ctype] = counts.get(ctype, 0) + 1
# largest group wins – ties are resolved deterministically by name
return max(counts.items(), key=lambda kv: (kv[1], kv[0]))[0]

def _filter_by_group(self, node_list, group_cls):
"""Keep only nodes that belong to the given class."""
return [nid for nid in node_list
if self.dynprompt.get_node(nid)["class_type"] == group_cls]

# ------------------------------------------------------------------
# node-picking logic
# ------------------------------------------------------------------
def ux_friendly_pick_node(self, node_list):
"""
Choose which ready node to execute next, honouring the current batch.
"""

# step 1 – ensure we have a valid *current* group
if (self._current_group_class is None or
not any(self.dynprompt.get_node(nid)["class_type"]
== self._current_group_class for nid in node_list)):
# Either first call, or the old batch is finished → pick a new one
self._current_group_class = self._pick_largest_group(node_list)

# 🌟 Magix (opt-in) log
if _ENABLE_MAGIX_LOGS:
ready_cnt = sum(
1 for nid in node_list
if self.dynprompt.get_node(nid)["class_type"]
== self._current_group_class
)
print(f"[Magix] 🎯 Switched batch → "
f"'{self._current_group_class}' ({ready_cnt} ready)")

# candidate set = nodes of the current batch
candidates = self._filter_by_group(node_list, self._current_group_class)

# -------------------- original UX heuristics --------------------
def is_output(node_id):
class_type = self.dynprompt.get_node(node_id)["class_type"]
class_def = nodes.NODE_CLASS_MAPPINGS[class_type]
return getattr(class_def, 'OUTPUT_NODE', False) is True

# 1️⃣ execute an output node as soon as one appears *within the batch*
for nid in candidates:
if is_output(nid):
return nid

# 2️⃣ decoder-before-preview pattern (still inside the batch)
for nid in candidates:
for blocked in self.blocking[nid]:
if is_output(blocked):
return nid

for nid in candidates:
for blocked in self.blocking[nid]:
for blocked2 in self.blocking[blocked]:
if is_output(blocked2):
return nid

# 3️⃣ otherwise just take the first candidate
return candidates[0]

# ------------------------------------------------------------------
# staging / completion plumbing – unchanged except for group bookkeeping
# ------------------------------------------------------------------
def stage_node_execution(self):
assert self.staged_node_id is None
if self.is_empty():
return None, None, None

available = self.get_ready_nodes()
if len(available) == 0:
if not available:
cycled_nodes = self.get_nodes_in_cycle()
# Because cycles composed entirely of static nodes are caught during initial validation,
# we will 'blame' the first node in the cycle that is not a static node.
blamed_node = cycled_nodes[0]
for node_id in cycled_nodes:
display_node_id = self.dynprompt.get_display_node_id(node_id)
if display_node_id != node_id:
blamed_node = display_node_id
for nid in cycled_nodes:
disp = self.dynprompt.get_display_node_id(nid)
if disp != nid:
blamed_node = disp
break
ex = DependencyCycleError("Dependency cycle detected")
error_details = {
err = {
"node_id": blamed_node,
"exception_message": str(ex),
"exception_type": "graph.DependencyCycleError",
"traceback": [],
"current_inputs": []
}
return None, error_details, ex
return None, err, ex

self.staged_node_id = self.ux_friendly_pick_node(available)
return self.staged_node_id, None, None

def ux_friendly_pick_node(self, node_list):
# If an output node is available, do that first.
# Technically this has no effect on the overall length of execution, but it feels better as a user
# for a PreviewImage to display a result as soon as it can
# Some other heuristics could probably be used here to improve the UX further.
def is_output(node_id):
class_type = self.dynprompt.get_node(node_id)["class_type"]
class_def = nodes.NODE_CLASS_MAPPINGS[class_type]
if hasattr(class_def, 'OUTPUT_NODE') and class_def.OUTPUT_NODE == True:
return True
return False

for node_id in node_list:
if is_output(node_id):
return node_id

#This should handle the VAEDecode -> preview case
for node_id in node_list:
for blocked_node_id in self.blocking[node_id]:
if is_output(blocked_node_id):
return node_id

#This should handle the VAELoader -> VAEDecode -> preview case
for node_id in node_list:
for blocked_node_id in self.blocking[node_id]:
for blocked_node_id1 in self.blocking[blocked_node_id]:
if is_output(blocked_node_id1):
return node_id

#TODO: this function should be improved
return node_list[0]

def unstage_node_execution(self):
assert self.staged_node_id is not None
"""Called when an execution turned out to be PENDING."""
self.staged_node_id = None
# do *not* clear the current group – the node wasn’t executed

def complete_node_execution(self):
node_id = self.staged_node_id
self.pop_node(node_id)
"""Called after successful (or cached) execution."""
nid = self.staged_node_id
assert nid is not None, "complete_node_execution with no node staged"
self.pop_node(nid)
self.staged_node_id = None
# keep `_current_group_class` as-is; it will be updated automatically
# in `ux_friendly_pick_node` when its batch runs dry

# ------------------------------------------------------------------
# cycle detection helper – untouched
# ------------------------------------------------------------------
def get_nodes_in_cycle(self):
# We'll dissolve the graph in reverse topological order to leave only the nodes in the cycle.
# We're skipping some of the performance optimizations from the original TopologicalSort to keep
Expand All @@ -269,6 +340,7 @@ def get_nodes_in_cycle(self):
to_remove = [node_id for node_id in blocked_by if len(blocked_by[node_id]) == 0]
return list(blocked_by.keys())


class ExecutionBlocker:
"""
Return this from a node and any users will be blocked with the given error message.
Expand Down