Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/docs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ jobs:
run: python -m pip install torch==2.9.0 --index-url https://download.pytorch.org/whl/test/cu130
- name: Install monarch
shell: bash -l {0}
run: python -m pip install monarch-no-torch==0.1.0.dev20250826 --find-links assets/ci
run: pip install assets/ci/monarch_no_torch-0.1.0.dev20251010-py3-none-any.whl
- name: Install torchforge
shell: bash -l {0}
env:
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/unit_test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ jobs:
- name: Install pytorch
run: python -m pip install torch==2.9.0.dev20250826 --extra-index-url https://download.pytorch.org/whl/nightly/cpu
- name: Install monarch
run: python -m pip install monarch-no-torch==0.1.0.dev20250826 --find-links assets/ci
run: pip install assets/ci/monarch_no_torch-0.1.0.dev20251010-py3-none-any.whl
- name: Install torchstore
run: pip install assets/wheels/torchstore-0.1.0-py3-none-any.whl
- name: Install torchtitan
Expand Down
30 changes: 28 additions & 2 deletions apps/grpo/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
from forge.controller.actor import ForgeActor
from forge.controller.provisioner import init_provisioner, shutdown
from forge.data.rewards import MathReward, ThinkingReward
from forge.env import MONARCH_HOSTMESH_V1
from forge.observability.metric_actors import get_or_create_metric_logger
from forge.observability.metrics import record_metric, Reduce
from forge.observability.perf_tracker import Tracer
Expand Down Expand Up @@ -314,14 +315,23 @@ async def main(cfg: DictConfig):
max_res_tokens = cfg.max_res_tokens

# ---- Global setups ---- #
provisioner = None
if cfg.get("provisioner", None) is not None:
await init_provisioner(
provisioner = await init_provisioner(
ProvisionerConfig(launcher_config=LauncherConfig(**cfg.provisioner))
)
else:
provisioner = await init_provisioner()

metric_logging_cfg = cfg.get("metric_logging", {"console": {"log_per_rank": False}})
mlogger = await get_or_create_metric_logger()
await mlogger.init_backends.call_one(metric_logging_cfg)
await ts.initialize(strategy=ts.ControllerStorageVolumes())

# In the host mesh v0 case, actors on remote hosts are not able to communicate
# with one another. Therefore we use the controller as our storage volume.
if not MONARCH_HOSTMESH_V1.get_value():
await ts.initialize(strategy=ts.ControllerStorageVolumes())
print("Torchstore successfully initialized with controller storage strategy")

# ---- Setup services ---- #

Expand Down Expand Up @@ -351,6 +361,22 @@ async def main(cfg: DictConfig):

print("All services initialized successfully!")

# In the HostMesh v1 case, we spawn a torchstore storage volume
# per trainer process.
# We initialize after service initialization because torchstore currently
# requires access to the underlying proc meshes in the local rank strategy.
# We should be able to hide this in the future.
if MONARCH_HOSTMESH_V1.get_value():
# TODO: support multiple host meshes
trainer_num_procs = cfg.actors.trainer["procs"]
trainer_host_mesh_name = cfg.actors.trainer["mesh_name"]
trainer_hosts = provisioner.get_host_mesh(trainer_host_mesh_name)
await ts.initialize(
mesh=trainer_hosts.spawn_procs(per_host={"procs": trainer_num_procs}),
strategy=ts.LocalRankStrategy(),
)
print("Torchstore successfully initialized with local rank strategy")

# ---- Core RL loops ---- #
async def continuous_rollouts():
rollout_count = 0
Expand Down
7 changes: 7 additions & 0 deletions apps/grpo/qwen3_1_7b.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -117,26 +117,33 @@ services:
policy:
procs: ${policy.engine_config.tensor_parallel_size}
num_replicas: 1
mesh_name: policy
with_gpus: true
ref_model:
procs: 1
num_replicas: 1
mesh_name: ref_model
with_gpus: true
reward_actor:
procs: 1
num_replicas: 1
mesh_name: reward_actor
with_gpus: false

actors:
dataset:
procs: 1
with_gpus: false
mesh_name: dataset
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These have to be manually specified?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there's probably a way to do this in a not silly way but alas

trainer:
procs: 1
with_gpus: true
mesh_name: trainer
replay_buffer:
procs: 1
with_gpus: false
mesh_name: replay_buffer
compute_advantages:
procs: 1
with_gpus: false
mesh_name: compute_advantages
7 changes: 7 additions & 0 deletions apps/grpo/qwen3_32b.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -122,26 +122,33 @@ services:
num_replicas: 1
hosts: 1
with_gpus: true
mesh_name: policy
ref_model:
procs: ${ref_model.parallelism.tensor_parallel_degree}
num_replicas: 1
with_gpus: true
mesh_name: ref_model
reward_actor:
procs: 1
num_replicas: 1
with_gpus: false
mesh_name: reward_actor

actors:
dataset:
procs: 1
with_gpus: false
mesh_name: dataset
trainer:
procs: 8
hosts: 1
with_gpus: true
mesh_name: trainer
replay_buffer:
procs: 1
with_gpus: false
mesh_name: replay_buffer
compute_advantages:
procs: 1
with_gpus: false
mesh_name: compute_advantages
7 changes: 7 additions & 0 deletions apps/grpo/qwen3_8b.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -117,25 +117,32 @@ services:
procs: ${policy.engine_config.tensor_parallel_size}
num_replicas: 1
with_gpus: true
mesh_name: policy
ref_model:
procs: 1
num_replicas: 1
with_gpus: true
mesh_name: ref_model
reward_actor:
procs: 1
num_replicas: 1
with_gpus: false
mesh_name: reward_actor

actors:
dataset:
procs: 1
with_gpus: false
mesh_name: dataset
trainer:
procs: 2
with_gpus: true
mesh_name: trainer
replay_buffer:
procs: 1
with_gpus: false
mesh_name: replay_buffer
compute_advantages:
procs: 1
with_gpus: false
mesh_name: compute_advantages
Binary file not shown.
Binary file modified assets/wheels/monarch-0.0.1-cp310-cp310-linux_x86_64.whl
Binary file not shown.
2 changes: 1 addition & 1 deletion scripts/build_wheels.sh
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ NC='\033[0m'
# Configuration
PYTORCH_VERSION="2.9.0.dev20250905"
VLLM_BRANCH="v0.10.0"
MONARCH_COMMIT="6ca383aca99480aa1bf5853478d4d09fcb224035"
MONARCH_COMMIT="d1c5ea4732704454efad82db678d4e66a4131bb2"
TORCHTITAN_COMMIT="0cfbd0b3c2d827af629a107a77a9e47229c31663"
TORCHSTORE_COMMIT="eed96eb55ce87d4a9880597dd7dfd0d291e9ac81"
BUILD_DIR="$HOME/forge-build"
Expand Down
5 changes: 4 additions & 1 deletion src/forge/actors/policy.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
from forge.data.sharding import VLLMSharding
from forge.data_models.completion import Completion
from forge.data_models.prompt import to_prompt
from forge.env import TORCHSTORE_USE_RDMA
from forge.interfaces import Policy as PolicyInterface
from forge.observability.metrics import record_metric, Reduce
from forge.observability.perf_tracker import Tracer
Expand Down Expand Up @@ -140,7 +141,9 @@ class Policy(PolicyInterface):
engine_config: EngineConfig | Mapping = field(default_factory=EngineConfig)
sampling_config: SamplingConfig | Mapping = field(default_factory=SamplingConfig)
available_devices: str | None = None
use_dcp: bool = True
use_dcp: bool = (
TORCHSTORE_USE_RDMA.get_value() == 0
) # torchstore currently only accepts 0 or 1
# Gets set up by setup
sampling_params: SamplingParams | None = None
lora_request: LoRARequest | None = None
Expand Down
5 changes: 4 additions & 1 deletion src/forge/actors/trainer.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@

from forge.controller import ForgeActor
from forge.data.utils import batch_to_device
from forge.env import TORCHSTORE_USE_RDMA
from forge.observability.metrics import record_metric, Reduce
from forge.observability.perf_tracker import Tracer

Expand Down Expand Up @@ -111,7 +112,9 @@ class RLTrainer(ForgeActor):
# Non JobConfig-related fields
loss: Callable = lambda logits, **targets: logits
state_dict_key: str = "model_state_dict"
use_dcp: bool = True
use_dcp: bool = (
TORCHSTORE_USE_RDMA.get_value() == 0
) # torchstore currently only accepts 0 or 1
dcp_path: str = "forge_dcp_tmp"

def __post_init__(self):
Expand Down
25 changes: 19 additions & 6 deletions src/forge/controller/launcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,19 @@
import monarch

import torchx.specs as specs

from monarch._rust_bindings.monarch_hyperactor.alloc import AllocConstraints
from monarch._rust_bindings.monarch_hyperactor.channel import ChannelTransport

from monarch._rust_bindings.monarch_hyperactor.config import configure
from monarch._src.actor.allocator import RemoteAllocator, TorchXRemoteAllocInitializer
from monarch.actor import Actor, endpoint, ProcMesh
from monarch.tools import commands
from monarch.tools.commands import info
from monarch.tools.components import hyperactor
from monarch.tools.config import Config, Workspace

from forge.env import MONARCH_HOSTMESH_V1

from forge.types import Launcher, LauncherConfig

_MAST_AVAILABLE = False
Expand Down Expand Up @@ -110,13 +114,17 @@ async def initialize(self) -> None:
async def get_allocator(self, name: str, num_hosts: int) -> tuple[Any, Any, str]:
pass

async def remote_setup(self, procs: ProcMesh) -> tuple[str, int]:
async def remote_setup(self, procs: ProcMesh) -> None:
pass


class Slurmlauncher(BaseLauncher):
async def initialize(self) -> None:
pass
if MONARCH_HOSTMESH_V1.get_value():
# HostMeshV1 currently requires explicit configuration
# of the underlying transport from client to mesh.
# This can be removed in the future once this has been removed.
configure(default_transport=ChannelTransport.Tcp)

async def get_allocator(self, name: str, num_hosts: int) -> tuple[Any, Any, str]:
appdef = hyperactor.host_mesh(
Expand Down Expand Up @@ -148,7 +156,7 @@ async def get_allocator(self, name: str, num_hosts: int) -> tuple[Any, Any, str]
server_name = f"slurm:///{server_info.name}"
return alloc, None, server_name # (Allocator, AllocConstraints, SeverName)

async def remote_setup(self, procs: ProcMesh) -> tuple[str, int]:
async def remote_setup(self, procs: ProcMesh) -> None:
return


Expand All @@ -172,6 +180,12 @@ def __init__(self, cfg: LauncherConfig | None = None):
self.job_name = self.cfg.job_name or self.create_job_name()

async def initialize(self) -> None:
if MONARCH_HOSTMESH_V1.get_value():
# HostMeshV1 currently requires explicit configuration
# of the underlying transport from client to mesh.
# This can be removed in the future once this has been removed.
configure(default_transport=ChannelTransport.MetaTlsWithHostname)

await self.launch_mast_job()

async def get_allocator(self, name: str, num_hosts: int) -> tuple[Any, Any, str]:
Expand All @@ -187,10 +201,9 @@ async def get_allocator(self, name: str, num_hosts: int) -> tuple[Any, Any, str]

return allocator, alloc_constraints, self.create_server_handle()

async def remote_setup(self, procs: ProcMesh) -> tuple[str, int]:
async def remote_setup(self, procs: ProcMesh) -> None:
setup = procs.spawn(f"setup-{uuid.uuid1()}", MastSetupActor)
await setup.mount.call(mount_dst="/mnt/wsfuse")
return await setup.get_info.choose()

async def launch_mast_job(self):
handle = self.create_server_handle()
Expand Down
Loading
Loading