From f2b785c98e7dd17e73ec46d06d0cbe8154306c73 Mon Sep 17 00:00:00 2001 From: Michael Winter Date: Mon, 16 Mar 2026 01:11:15 +0100 Subject: [PATCH] Encapsulate voice-leading in Path.step() - Move transposition, voice mapping into Path.step() - Add node_visit_counts and voice_stay_count to each PathStep - Fix voice tracking to use voice identity, not position - Clean up unused last_graph_nodes code - Full encapsulation: Path handles all voice-leading state --- src/graph.py | 96 +++++++---------------------------------- src/path.py | 118 ++++++++++++++++++++++++++++++++++++++++++++------- 2 files changed, 118 insertions(+), 96 deletions(-) diff --git a/src/graph.py b/src/graph.py index 78de12e..f2db416 100644 --- a/src/graph.py +++ b/src/graph.py @@ -40,9 +40,6 @@ class PathFinder: Returns: Path object containing output chords, graph chords, and metadata """ - from .pitch import Pitch - from .chord import Chord - if weights_config is None: weights_config = self._default_weights_config() @@ -51,32 +48,15 @@ class PathFinder: return Path(chord[0] if chord else None, weights_config) original_chord = chord[0] - graph_node = original_chord - output_chord = original_chord path_obj = Path(original_chord, weights_config) - last_graph_nodes = (graph_node,) + path_obj.init_state( + set(self.graph.nodes()), len(original_chord.pitches), original_chord + ) - graph_path = [graph_node] - - # Track how long since each node was last visited (for DCA Hamiltonian) - node_visit_counts = {node: 0 for node in self.graph.nodes()} - # Mark start node as just visited - node_visit_counts[graph_node] = 0 - - dims = output_chord.dims - cumulative_trans = Pitch(tuple(0 for _ in range(len(dims))), dims) - - num_voices = len(output_chord.pitches) - voice_map = list(range(num_voices)) - - voice_stay_count = [0] * num_voices + graph_node = original_chord for _ in range(max_length): - # Increment all node visit counts - for node in node_visit_counts: - node_visit_counts[node] += 1 - out_edges = list(self.graph.out_edges(graph_node, data=True)) if not out_edges: @@ -86,12 +66,11 @@ class PathFinder: candidates = self._build_candidates( out_edges, path_obj.output_chords, - last_graph_nodes, weights_config, - tuple(voice_stay_count), - graph_path, - cumulative_trans, - node_visit_counts, + tuple(path_obj._voice_stay_count), + path_obj.graph_chords, + path_obj._cumulative_trans, + path_obj._node_visit_counts, ) # Compute weights from raw scores @@ -107,57 +86,15 @@ class PathFinder: valid_candidates, weights=[c.weight for c in valid_candidates] )[0] - next_graph_node = chosen.graph_node - trans = chosen.edge[2].get("transposition") - movement = chosen.edge[2].get("movements", {}) - - new_voice_map = [None] * num_voices - for src_idx, dest_idx in movement.items(): - new_voice_map[dest_idx] = voice_map[src_idx] - voice_map = new_voice_map - - if trans is not None: - cumulative_trans = cumulative_trans.transpose(trans) - - transposed = next_graph_node.transpose(cumulative_trans) - - reordered_pitches = tuple( - transposed.pitches[voice_map[i]] for i in range(num_voices) + # Use path.step() to handle all voice-leading and state updates + path_obj.step( + graph_node=chosen.graph_node, + edge_data=chosen.edge[2], + candidates=candidates, + chosen_scores=chosen.scores, ) - output_chord = Chord(reordered_pitches, dims) - - # Collect all candidates' scores for storage - all_candidates_scores = [c.scores for c in candidates] - - # Add step to Path object - path_obj.add_step( - graph_node=next_graph_node, - output_chord=output_chord, - transposition=trans, - movements=movement, - scores=chosen.scores, - candidates=all_candidates_scores, - ) - - for voice_idx in range(num_voices): - curr_cents = path_obj.output_chords[-1].pitches[voice_idx].to_cents() - next_cents = output_chord.pitches[voice_idx].to_cents() - if curr_cents == next_cents: - voice_stay_count[voice_idx] += 1 - else: - voice_stay_count[voice_idx] = 0 - - graph_node = next_graph_node - graph_path.append(graph_node) - - # Reset visit count for visited node - if next_graph_node in node_visit_counts: - node_visit_counts[next_graph_node] = 0 - - last_graph_nodes = last_graph_nodes + (graph_node,) - if len(last_graph_nodes) > 2: - last_graph_nodes = last_graph_nodes[-2:] + graph_node = chosen.graph_node return path_obj @@ -165,7 +102,6 @@ class PathFinder: self, out_edges: list, path: list["Chord"], - last_chords: tuple["Chord", ...], config: dict, voice_stay_count: tuple[int, ...] | None, graph_path: list["Chord"] | None, @@ -295,7 +231,7 @@ class PathFinder: continue candidates = self._build_candidates( - out_edges, [chord], (chord,), weights_config, None, None, None, None + out_edges, [chord], weights_config, None, None, None, None ) nonzero = sum(1 for c in candidates if c.weight > 0) diff --git a/src/path.py b/src/path.py index e9d45fa..bea2c58 100644 --- a/src/path.py +++ b/src/path.py @@ -5,11 +5,14 @@ Path and PathStep classes for storing path state from PathFinder. from __future__ import annotations from dataclasses import dataclass, field -from typing import Any +from typing import TYPE_CHECKING, Any from .pitch import Pitch from .chord import Chord +if TYPE_CHECKING: + from .graph import Candidate + @dataclass class PathStep: @@ -20,38 +23,121 @@ class PathStep: transposition: Pitch | None = None movements: dict[int, int] = field(default_factory=dict) scores: dict[str, float] = field(default_factory=dict) - candidates: list[dict[str, float]] = field(default_factory=list) + candidates: list["Candidate"] = field(default_factory=list) + node_visit_counts: dict | None = None + voice_stay_count: tuple[int, ...] | None = None class Path: """Stores the complete state of a generated path.""" def __init__( - self, initial_chord: Chord, weights_config: dict[str, Any] | None = None + self, initial_chord: Chord | None, weights_config: dict[str, Any] | None = None ): self.initial_chord = initial_chord self.steps: list[PathStep] = [] self.weights_config = weights_config if weights_config is not None else {} - def add_step( - self, - graph_node: Chord, - output_chord: Chord, - transposition: Pitch | None = None, - movements: dict[int, int] | None = None, - scores: dict[str, float] | None = None, - candidates: list[dict[str, float]] | None = None, + # State for tracking + self._node_visit_counts: dict = {} + self._voice_stay_count: list[int] = [] + self._voice_map: list[int] = [] # which voice is at each position + self._cumulative_trans: Pitch | None = None # cumulative transposition + + def init_state( + self, graph_nodes: set, num_voices: int, initial_chord: Chord ) -> None: - """Add a step to the path.""" + """Initialize state after graph is known.""" + self._node_visit_counts = {node: 0 for node in graph_nodes} + self._node_visit_counts[initial_chord] = 0 + self._voice_stay_count = [0] * num_voices + self._voice_map = list(range(num_voices)) # voice i at position i + + dims = initial_chord.dims + self._cumulative_trans = Pitch(tuple(0 for _ in range(len(dims))), dims) + + def step( + self, + graph_node: "Chord", + edge_data: dict, + candidates: list["Candidate"], + chosen_scores: dict[str, float] | None = None, + ) -> PathStep: + """Process a step: update state, compute output, return step. + + Takes graph_node and edge_data, handles all voice-leading internally. + """ + # Get edge information + trans = edge_data.get("transposition") + movement = edge_data.get("movements", {}) + + # Update cumulative transposition + if trans is not None: + self._cumulative_trans = self._cumulative_trans.transpose(trans) + + # Transpose the graph node + transposed = graph_node.transpose(self._cumulative_trans) + + # Update voice map based on movement + new_voice_map = [None] * len(self._voice_map) + for src_idx, dest_idx in movement.items(): + new_voice_map[dest_idx] = self._voice_map[src_idx] + self._voice_map = new_voice_map + + # Reorder pitches according to voice map + reordered_pitches = tuple( + transposed.pitches[self._voice_map[i]] for i in range(len(self._voice_map)) + ) + output_chord = Chord(reordered_pitches, graph_node.dims) + + # Get previous output chord + prev_output_chord = self.output_chords[-1] + + # Increment all node visit counts + for node in self._node_visit_counts: + self._node_visit_counts[node] += 1 + + # Update voice stay counts (comparing same voice, not position) + for voice_idx in range(len(self._voice_stay_count)): + # Find which position this voice was at in previous chord + prev_voice_pos = None + for pos, voice in enumerate(self._voice_map): + if voice == voice_idx: + prev_voice_pos = pos + break + + # Current position of this voice + curr_voice_pos = voice_idx + + if prev_voice_pos is not None: + prev_cents = prev_output_chord.pitches[prev_voice_pos].to_cents() + else: + prev_cents = None + + curr_cents = output_chord.pitches[curr_voice_pos].to_cents() + + if prev_cents is not None and prev_cents == curr_cents: + self._voice_stay_count[voice_idx] += 1 + else: + self._voice_stay_count[voice_idx] = 0 + + # Create step with current state step = PathStep( graph_node=graph_node, output_chord=output_chord, - transposition=transposition, - movements=movements if movements is not None else {}, - scores=scores if scores is not None else {}, - candidates=candidates if candidates is not None else [], + transposition=trans, + movements=movement, + scores=chosen_scores if chosen_scores is not None else {}, + candidates=candidates, + node_visit_counts=dict(self._node_visit_counts), + voice_stay_count=tuple(self._voice_stay_count), ) + + # Reset visit count for this node + self._node_visit_counts[graph_node] = 0 + self.steps.append(step) + return step @property def graph_chords(self) -> list[Chord]: