From 861d012a95a7360111e49e42f48172971c1e88fc Mon Sep 17 00:00:00 2001 From: Michael Winter Date: Mon, 16 Mar 2026 16:53:22 +0100 Subject: [PATCH] Refactor: Unify Candidate and PathStep, fix DCA Hamiltonian - Remove Candidate class, use PathStep for both hypothetical and actual steps - Simplify Path.step() to accept a PathStep - Fix DCA Hamiltonian to return visit_count directly instead of normalized score - Tests pass and DCA properly discriminates --- src/path.py | 72 +++++++++----------------------- src/pathfinder.py | 102 ++++++++++++++++++++++++++-------------------- 2 files changed, 77 insertions(+), 97 deletions(-) diff --git a/src/path.py b/src/path.py index 401abf2..637489c 100644 --- a/src/path.py +++ b/src/path.py @@ -5,14 +5,11 @@ Path and PathStep classes for storing path state from PathFinder. from __future__ import annotations from dataclasses import dataclass, field -from typing import TYPE_CHECKING, Any +from typing import Any from .pitch import Pitch from .chord import Chord -if TYPE_CHECKING: - from .pathfinder import Candidate - @dataclass class PathStep: @@ -25,7 +22,7 @@ class PathStep: transposition: Pitch | None = None movements: dict[int, int] = field(default_factory=dict) scores: dict[str, float] = field(default_factory=dict) - candidates: list["Candidate"] = field(default_factory=list) + weight: float = 0.0 # computed later by _compute_weights last_visited_count_before: dict | None = None last_visited_count_after: dict | None = None sustain_count_before: tuple[int, ...] | None = None @@ -79,45 +76,24 @@ class Path: # Initialize fresh: all voices start at 0 return tuple(0 for _ in range(self._num_voices)) - def step( - self, - edge: tuple, - candidates: list["Candidate"], - chosen_scores: dict[str, float] | None = None, - ) -> PathStep: - """Process a step: update state, compute output, return step. + def step(self, step: PathStep) -> PathStep: + """Add a completed step to the path. - Takes edge (source_node, destination_node, edge_data), handles all voice-leading internally. + Takes a PathStep (computed as a hypothetical step), updates internal state, + and adds it to the path. """ - source_node = edge[0] - destination_node = edge[1] - edge_data = edge[2] - - trans = edge_data.get("transposition") - movement = edge_data.get("movements", {}) - # Update cumulative transposition - if trans is not None: - self._cumulative_trans = self._cumulative_trans.transpose(trans) - - # Transpose the destination node - transposed = destination_node.transpose(self._cumulative_trans) + if step.transposition is not None: + self._cumulative_trans = self._cumulative_trans.transpose( + step.transposition + ) # Update voice map based on movement new_voice_map = [None] * len(self._voice_map) - for src_idx, dest_idx in movement.items(): + for src_idx, dest_idx in step.movements.items(): new_voice_map[dest_idx] = self._voice_map[src_idx] self._voice_map = new_voice_map - # Reorder pitches according to voice map - reordered_pitches = tuple( - transposed.pitches[self._voice_map[i]] for i in range(len(self._voice_map)) - ) - destination_chord = Chord(reordered_pitches, destination_node.dims) - - # Get previous output chord - source_chord = self.output_chords[-1] - # Get BEFORE state from last step (or initialize fresh) last_visited_before = self._get_last_visited_counts() sustain_before = self._get_sustain_counts() @@ -126,32 +102,22 @@ class Path: last_visited_after = dict(last_visited_before) for node in last_visited_after: last_visited_after[node] += 1 - last_visited_after[destination_node] = 0 + last_visited_after[step.destination_node] = 0 sustain_after = list(sustain_before) for voice_idx in range(len(sustain_after)): - curr_cents = source_chord.pitches[voice_idx].to_cents() - next_cents = destination_chord.pitches[voice_idx].to_cents() + curr_cents = step.source_chord.pitches[voice_idx].to_cents() + next_cents = step.destination_chord.pitches[voice_idx].to_cents() if curr_cents == next_cents: sustain_after[voice_idx] += 1 else: sustain_after[voice_idx] = 0 - # Create step with before and after state - step = PathStep( - source_node=source_node, - destination_node=destination_node, - source_chord=source_chord, - destination_chord=destination_chord, - transposition=trans, - movements=movement, - scores=chosen_scores if chosen_scores is not None else {}, - candidates=candidates, - last_visited_count_before=last_visited_before, - last_visited_count_after=last_visited_after, - sustain_count_before=sustain_before, - sustain_count_after=tuple(sustain_after), - ) + # Update step with computed state + step.last_visited_count_before = last_visited_before + step.last_visited_count_after = last_visited_after + step.sustain_count_before = sustain_before + step.sustain_count_after = tuple(sustain_after) self.steps.append(step) return step diff --git a/src/pathfinder.py b/src/pathfinder.py index be0cc1e..9ec77d8 100644 --- a/src/pathfinder.py +++ b/src/pathfinder.py @@ -4,23 +4,12 @@ PathFinder - finds paths through voice leading graphs. """ from __future__ import annotations -from dataclasses import dataclass import networkx as nx from random import choices, seed -from typing import Callable, Iterator +from typing import Callable -from .path import Path - - -@dataclass -class Candidate: - """A candidate edge with raw factor scores.""" - - edge: tuple - edge_index: int - graph_node: "Chord" - scores: dict[str, float] - weight: float = 0.0 # computed later by _compute_weights +from .chord import Chord +from .path import Path, PathStep class PathFinder: @@ -100,13 +89,9 @@ class PathFinder: )[0] # Use path.step() to handle all voice-leading and state updates - path_obj.step( - edge=chosen.edge, - candidates=candidates, - chosen_scores=chosen.scores, - ) + path_obj.step(chosen) - graph_node = chosen.graph_node + graph_node = chosen.destination_node step_num += 1 # Invoke callback if configured @@ -124,15 +109,43 @@ class PathFinder: graph_path: list["Chord"] | None, cumulative_trans: "Pitch | None", node_visit_counts: dict | None, - ) -> list["Candidate"]: - """Build candidates with raw factor scores only.""" + ) -> list[PathStep]: + """Build hypothetical path steps with raw factor scores.""" if not out_edges: return [] + if not path: + return [] + + source_chord = path[-1] candidates = [] for i, edge in enumerate(out_edges): + source_node = edge[0] + destination_node = edge[1] edge_data = edge[2] + trans = edge_data.get("transposition") + movement = edge_data.get("movements", {}) + + # Transpose destination node + if trans is not None and cumulative_trans is not None: + transposed = destination_node.transpose( + cumulative_trans.transpose(trans) + ) + else: + transposed = destination_node + + # Apply voice map + voice_map = list(range(len(source_chord.pitches))) + new_voice_map = [None] * len(voice_map) + for src_idx, dest_idx in movement.items(): + new_voice_map[dest_idx] = voice_map[src_idx] + + reordered_pitches = tuple( + transposed.pitches[new_voice_map[i]] for i in range(len(new_voice_map)) + ) + destination_chord = Chord(reordered_pitches, destination_node.dims) + # All factors - always compute verbatim direct_tuning = self._factor_direct_tuning(edge_data, config) voice_crossing = self._factor_voice_crossing(edge_data, config) @@ -154,18 +167,27 @@ class PathFinder: "target_range": target, } - candidates.append(Candidate(edge, i, edge[1], scores, 0.0)) + step = PathStep( + source_node=source_node, + destination_node=destination_node, + source_chord=source_chord, + destination_chord=destination_chord, + transposition=trans, + movements=movement, + scores=scores, + ) + candidates.append(step) return candidates def _compute_weights( self, - candidates: list["Candidate"], + candidates: list[PathStep], config: dict, ) -> list[float]: """Compute weights from raw scores for all candidates. - Returns a list of weights, and updates each candidate's weight field. + Returns a list of weights, and updates each step's weight field. """ if not candidates: return [] @@ -194,20 +216,20 @@ class PathFinder: # Calculate weights for each candidate weights = [] - for i, candidate in enumerate(candidates): - scores = candidate.scores + for i, step in enumerate(candidates): + scores = step.scores w = 1.0 # Hard factors (multiplicative - eliminates if 0) w *= scores.get("direct_tuning", 0) if w == 0: - candidate.weight = 0.0 + step.weight = 0.0 weights.append(0.0) continue w *= scores.get("voice_crossing", 0) if w == 0: - candidate.weight = 0.0 + step.weight = 0.0 weights.append(0.0) continue @@ -223,7 +245,7 @@ class PathFinder: if target_norm: w += target_norm[i] * config.get("weight_target_range", 1) - candidate.weight = w + step.weight = w weights.append(w) return weights @@ -353,29 +375,21 @@ class PathFinder: def _factor_dca_hamiltonian( self, edge: tuple, node_visit_counts: dict | None, config: dict ) -> float: - """Returns probability based on how long since node was last visited. + """Returns score based on how long since node was last visited. - DCA Hamiltonian: longer since visited = higher probability. - Similar to DCA voice movement but for graph nodes. + DCA Hamiltonian: longer since visited = higher score. """ if config.get("weight_dca_hamiltonian", 1) == 0: return 1.0 if node_visit_counts is None: - return 1.0 + return 0.0 destination = edge[1] - if destination not in node_visit_counts: - return 1.0 + visit_count = node_visit_counts.get(destination, 0) - visit_count = node_visit_counts[destination] - max_count = max(node_visit_counts.values()) if node_visit_counts else 0 - - if max_count == 0: - return 1.0 - - # Normalize by max squared - gives stronger discrimination - return visit_count / (max_count**2) + # Return the visit count - higher is better (more steps since last visit) + return float(visit_count) def _factor_dca_voice_movement( self,