Skip to content

Conversation

@hyeok9855
Copy link
Collaborator

@hyeok9855 hyeok9855 commented Nov 4, 2025

  • I've read the .github/CONTRIBUTING.md file
  • My code follows the typing guidelines
  • I've added appropriate tests
  • I've run pre-commit hooks locally

Description

Major refactorings for conditional GFlowNets.

  • Add ConditionalEnv as a new abstract class for an environment with a conditional reward
  • Now, the Trajectories.conditions have a shape of (n_trajectories, condition_vector_dim), simplifying many shape-related logics.
  • Fix the train_conditional.py example (before, true_dist for the validation was wrong.)

TODO (maybe in another PR?)

  • Let ConditionalEnv support conditional transitions

@hyeok9855 hyeok9855 marked this pull request as draft November 4, 2025 01:15
@hyeok9855 hyeok9855 self-assigned this Nov 4, 2025
@hyeok9855 hyeok9855 changed the title Refactor Conditional GFlowNets [WIP] Refactor Conditional GFlowNets Nov 4, 2025
@hyeok9855 hyeok9855 mentioned this pull request Nov 19, 2025
4 tasks
@hyeok9855 hyeok9855 marked this pull request as ready for review November 20, 2025 18:44
@hyeok9855 hyeok9855 changed the title [WIP] Refactor Conditional GFlowNets Refactor Conditional GFlowNets Nov 20, 2025
Copy link
Collaborator

@younik younik left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a few comments; good to go for me, but I would wait for @josephdviviano as he understands this code better

Comment on lines +349 to +353
# Concatenate conditions of the trajectories.
if self.conditions is not None and other.conditions is not None:
self.conditions = torch.cat((self.conditions, other.conditions), dim=0)
else:
self.conditions = None
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we maybe add a test for extending with conditions, and then try common ops like get_item to check the output is as expected?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we maybe add a test for extending with conditions

I will add one.

and then try common ops like get_item to check the output is as expected?

I have no idea what this means. Could you elaborate more?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mean in the test, after calling extend, check if the extend operation gave the expected result.
Like here:

pre_extend_shape = state2.batch_shape
state1.extend(state2)
assert state2.batch_shape == pre_extend_shape
# Check final shape should be (max_len=3, B=4)
assert state1.batch_shape == (3, 4)
# The actual count might be higher due to padding with sink states
assert state1.tensor.x.size(0) == expected_nodes
assert state1.tensor.num_edges == expected_edges
# Check if states are extended as expected
assert (state1[0, 0].tensor.x == datas[0].x).all()
assert (state1[0, 1].tensor.x == datas[1].x).all()
assert (state1[0, 2].tensor.x == datas[4].x).all()
assert (state1[0, 3].tensor.x == datas[5].x).all()
assert (state1[1, 0].tensor.x == datas[2].x).all()
assert (state1[1, 1].tensor.x == datas[3].x).all()
assert (state1[1, 2].tensor.x == datas[6].x).all()
assert (state1[1, 3].tensor.x == datas[7].x).all()
assert (state1[2, 0].tensor.x == MyGraphStates.sf.x).all()
assert (state1[2, 1].tensor.x == MyGraphStates.sf.x).all()
assert (state1[2, 2].tensor.x == datas[8].x).all()
assert (state1[2, 3].tensor.x == datas[9].x).all()

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. I will add a test soon!

Comment on lines +452 to +464
def reward(self, states: States, conditions: torch.Tensor) -> torch.Tensor:
"""Compute rewards for the conditional environment.
Args:
states: The states to compute rewards for.
states.tensor.shape should be (batch_size, *state_shape)
conditions: The conditions to compute rewards for.
conditions.shape should be (batch_size, condition_vector_dim)
Returns:
A tensor of shape (batch_size,) containing the rewards.
"""
raise NotImplementedError
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

aha, this is not a real subclass of Env, as conditions are mandatory (i.e. if you can't call this function pretending it is an env obj while it is ConditionEnv).

Would it make sense to have a default condition?
If not, this shouldn't inehrit from Env probably.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it make sense to have a default condition?

How could having a default condition solve the problem?

If not, this shouldn't inherit from Env probably.

Maybe, but still we need a parent class that defines the default methods for Envs, like reward, step, etc...

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How could having a default condition solve the problem?

If we have a function like this:

def get_reward(env: Env, states: States) -> torch.Tensor:
   return  env.reward(states)

This should work with any Env object, given the interface of Env.

However, currently, if I pass a ConditionEnv (which is an Env), this will fail as you need to specify the conditioning. If you have a default value for conditioning, now the get_reward function will work properly (indeed, with default, the reward function interface of ConditionEnv becomes a subtype of the one of Env)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

An alternative approach would be to have the conditions live inside the states themselves (states could have a conditioning field that is None unless conditioning is required, and then anything that accepts States follows a different path when conditioning is present).

The env itself would only be conditional or not depending on the logic the user defines in the reward and step functions. No actual ConditionalEnv class would be required.

The estimators would also optionally use the conditioning information, if it's present, just like how it's done currently.

Copy link
Collaborator

@josephdviviano josephdviviano left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall a really nice PR, but I have a few questions about changes that seem unrelated to the goal (in particular I think we remove a few checks that might have side effects not captured in our test suites) and I wonder if it would be cleaner for the conditioning to live directly within the States class which would help avoid a lot of added complexity. We can discuss in the standup. Great work!

self.conditions = conditions
assert self.conditions is None or (
self.conditions.shape[: len(batch_shape)] == batch_shape
len(self.conditions.shape) == 2
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

right, because we assume the conditioning would not change through the trajectory?

self._log_rewards[self.is_terminating] = self.env.log_reward(
if isinstance(self.env, ConditionalEnv):
assert self.conditions is not None
log_reward_fn = partial(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice!

# Assign rewards to valid terminating states.
terminating_mask = is_terminating & (
valid_batch_indices == (self.terminating_idx[valid_traj_indices] - 1)
log_rewards[self.terminating_idx - 1, torch.arange(len(self))] = (
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

really nice cleanup here!

Comment on lines +452 to +464
def reward(self, states: States, conditions: torch.Tensor) -> torch.Tensor:
"""Compute rewards for the conditional environment.
Args:
states: The states to compute rewards for.
states.tensor.shape should be (batch_size, *state_shape)
conditions: The conditions to compute rewards for.
conditions.shape should be (batch_size, condition_vector_dim)
Returns:
A tensor of shape (batch_size,) containing the rewards.
"""
raise NotImplementedError
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

An alternative approach would be to have the conditions live inside the states themselves (states could have a conditioning field that is None unless conditioning is required, and then anything that accepts States follows a different path when conditioning is present).

The env itself would only be conditional or not depending on the logic the user defines in the reward and step functions. No actual ConditionalEnv class would be required.

The estimators would also optionally use the conditioning information, if it's present, just like how it's done currently.


from gfn.containers import StatesContainer, Trajectories
from gfn.env import DiscreteEnv
from gfn.env import Env
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is technically wrong because FlowMatching won't work for continuous environments.

conditions = conditions[mask]
with has_conditions_exception_handler("logF", self.logF):
log_F = self.logF(valid_states, conditions)
log_F = self.logF(valid_states, conditions).squeeze(-1)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we move conditions into the states class, this logic can all be moved inside the estimator.

)

self._all_states_tensor = all_states_tensor
if self.store_all_states:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice, thanks for this addition :)

valid_states = trajectories.states[state_mask]
valid_actions = trajectories.actions[action_mask]

if valid_states.batch_shape != valid_actions.batch_shape:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are you removing this stuff? I thought this was a useful check.

# Build distribution for active rows and compute step log-probs
# TODO: masking ctx with step_mask outside of compute_dist and log_probs,
# i.e., implement __getitem__ for ctx. (maybe we should contain only the
# tensors, and not additional metadata like the batch size, device, etc.)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Masking of ctx should already be handled. Or are you suggesting it should be handled in this logic here (i.e., generic)?

valid_step_actions.tensor, dist, ctx, step_mask, vectorized=False
)

# Pad back to full batch size.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why did you remove this? It's important.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants