diff --git a/src/graph.py b/src/graph.py index 7ffcc34..78de12e 100644 --- a/src/graph.py +++ b/src/graph.py @@ -4,10 +4,24 @@ PathFinder - finds paths through voice leading graphs. """ from __future__ import annotations +from dataclasses import dataclass import networkx as nx from random import choices, seed from typing import Iterator +from .path import Path + + +@dataclass +class Candidate: + """A candidate edge with raw factor scores.""" + + edge: tuple + edge_index: int + graph_node: "Chord" + scores: dict[str, float] + weight: float = 0.0 # computed later by _compute_weights + class PathFinder: """Finds paths through voice leading graphs.""" @@ -20,26 +34,27 @@ class PathFinder: start_chord: "Chord | None" = None, max_length: int = 100, weights_config: dict | None = None, - ) -> tuple[list["Chord"], list["Chord"]]: + ) -> Path: """Find a stochastic path through the graph. Returns: - Tuple of (path, graph_path) where: - - path: list of output Chord objects (transposed) - - graph_path: list of original graph Chord objects (untransposed) + Path object containing output chords, graph chords, and metadata """ + from .pitch import Pitch + from .chord import Chord + if weights_config is None: weights_config = self._default_weights_config() chord = self._initialize_chords(start_chord) if not chord or chord[0] is None or len(self.graph.nodes()) == 0: - return [], [] + return Path(chord[0] if chord else None, weights_config) original_chord = chord[0] graph_node = original_chord output_chord = original_chord - path = [output_chord] + path_obj = Path(original_chord, weights_config) last_graph_nodes = (graph_node,) graph_path = [graph_node] @@ -49,8 +64,6 @@ class PathFinder: # Mark start node as just visited node_visit_counts[graph_node] = 0 - from .pitch import Pitch - dims = output_chord.dims cumulative_trans = Pitch(tuple(0 for _ in range(len(dims))), dims) @@ -69,9 +82,10 @@ class PathFinder: if not out_edges: break - weights = self._calculate_edge_weights( + # Build candidates with raw scores + candidates = self._build_candidates( out_edges, - path, + path_obj.output_chords, last_graph_nodes, weights_config, tuple(voice_stay_count), @@ -80,10 +94,22 @@ class PathFinder: node_visit_counts, ) - edge = choices(out_edges, weights=weights)[0] - next_graph_node = edge[1] - trans = edge[2].get("transposition") - movement = edge[2].get("movements", {}) + # Compute weights from raw scores + self._compute_weights(candidates, weights_config) + + # Filter out candidates with zero weight + valid_candidates = [c for c in candidates if c.weight > 0] + if not valid_candidates: + break + + # Select using weighted choice + chosen = choices( + valid_candidates, weights=[c.weight for c in valid_candidates] + )[0] + + next_graph_node = chosen.graph_node + trans = chosen.edge[2].get("transposition") + movement = chosen.edge[2].get("movements", {}) new_voice_map = [None] * num_voices for src_idx, dest_idx in movement.items(): @@ -98,12 +124,24 @@ class PathFinder: reordered_pitches = tuple( transposed.pitches[voice_map[i]] for i in range(num_voices) ) - from .chord import Chord output_chord = Chord(reordered_pitches, dims) + # Collect all candidates' scores for storage + all_candidates_scores = [c.scores for c in candidates] + + # Add step to Path object + path_obj.add_step( + graph_node=next_graph_node, + output_chord=output_chord, + transposition=trans, + movements=movement, + scores=chosen.scores, + candidates=all_candidates_scores, + ) + for voice_idx in range(num_voices): - curr_cents = path[-1].pitches[voice_idx].to_cents() + curr_cents = path_obj.output_chords[-1].pitches[voice_idx].to_cents() next_cents = output_chord.pitches[voice_idx].to_cents() if curr_cents == next_cents: voice_stay_count[voice_idx] += 1 @@ -117,12 +155,125 @@ class PathFinder: if next_graph_node in node_visit_counts: node_visit_counts[next_graph_node] = 0 - path.append(output_chord) last_graph_nodes = last_graph_nodes + (graph_node,) if len(last_graph_nodes) > 2: last_graph_nodes = last_graph_nodes[-2:] - return path, graph_path + return path_obj + + def _build_candidates( + self, + out_edges: list, + path: list["Chord"], + last_chords: tuple["Chord", ...], + config: dict, + voice_stay_count: tuple[int, ...] | None, + graph_path: list["Chord"] | None, + cumulative_trans: "Pitch | None", + node_visit_counts: dict | None, + ) -> list["Candidate"]: + """Build candidates with raw factor scores only.""" + if not out_edges: + return [] + + candidates = [] + for i, edge in enumerate(out_edges): + edge_data = edge[2] + + # All factors - always compute verbatim + direct_tuning = self._factor_direct_tuning(edge_data, config) + voice_crossing = self._factor_voice_crossing(edge_data, config) + melodic = self._factor_melodic_threshold(edge_data, config) + contrary = self._factor_contrary_motion(edge_data, config) + hamiltonian = self._factor_dca_hamiltonian(edge, node_visit_counts, config) + dca_voice = self._factor_dca_voice_movement( + edge, path, voice_stay_count, config, cumulative_trans + ) + target = self._factor_target_range(edge, path, config, cumulative_trans) + + scores = { + "direct_tuning": direct_tuning, + "voice_crossing": voice_crossing, + "melodic_threshold": melodic, + "contrary_motion": contrary, + "dca_hamiltonian": hamiltonian, + "dca_voice_movement": dca_voice, + "target_range": target, + } + + candidates.append(Candidate(edge, i, edge[1], scores, 0.0)) + + return candidates + + def _compute_weights( + self, + candidates: list["Candidate"], + config: dict, + ) -> list[float]: + """Compute weights from raw scores for all candidates. + + Returns a list of weights, and updates each candidate's weight field. + """ + if not candidates: + return [] + + # Collect raw values for normalization + melodic_values = [c.scores.get("melodic_threshold", 0) for c in candidates] + contrary_values = [c.scores.get("contrary_motion", 0) for c in candidates] + hamiltonian_values = [c.scores.get("dca_hamiltonian", 0) for c in candidates] + dca_values = [c.scores.get("dca_voice_movement", 0) for c in candidates] + target_values = [c.scores.get("target_range", 0) for c in candidates] + + # Helper function for sum normalization + def sum_normalize(values: list) -> list | None: + """Normalize values to sum to 1. Returns None if no discrimination.""" + total = sum(values) + if total == 0 or len(set(values)) <= 1: + return None + return [v / total for v in values] + + # Sum normalize each factor + melodic_norm = sum_normalize(melodic_values) + contrary_norm = sum_normalize(contrary_values) + hamiltonian_norm = sum_normalize(hamiltonian_values) + dca_norm = sum_normalize(dca_values) + target_norm = sum_normalize(target_values) + + # Calculate weights for each candidate + weights = [] + for i, candidate in enumerate(candidates): + scores = candidate.scores + w = 1.0 + + # Hard factors (multiplicative - eliminates if 0) + w *= scores.get("direct_tuning", 0) + if w == 0: + candidate.weight = 0.0 + weights.append(0.0) + continue + + w *= scores.get("voice_crossing", 0) + if w == 0: + candidate.weight = 0.0 + weights.append(0.0) + continue + + # Soft factors (sum normalized, then weighted) + if melodic_norm: + w += melodic_norm[i] * config.get("weight_melodic", 1) + if contrary_norm: + w += contrary_norm[i] * config.get("weight_contrary_motion", 0) + if hamiltonian_norm: + w += hamiltonian_norm[i] * config.get("weight_dca_hamiltonian", 1) + if dca_norm: + w += dca_norm[i] * config.get("weight_dca_voice_movement", 1) + if target_norm: + w += target_norm[i] * config.get("weight_target_range", 1) + + candidate.weight = w + weights.append(w) + + return weights def _initialize_chords(self, start_chord: "Chord | None") -> tuple: """Initialize chord sequence.""" @@ -143,10 +294,10 @@ class PathFinder: if len(out_edges) == 0: continue - weights = self._calculate_edge_weights( - out_edges, [chord], (chord,), weights_config, None + candidates = self._build_candidates( + out_edges, [chord], (chord,), weights_config, None, None, None, None ) - nonzero = sum(1 for w in weights if w > 0) + nonzero = sum(1 for c in candidates if c.weight > 0) if nonzero > 0: return (chord,) @@ -169,112 +320,6 @@ class PathFinder: "target_range_octaves": 2.0, } - def _calculate_edge_weights( - self, - out_edges: list, - path: list["Chord"], - last_chords: tuple["Chord", ...], - config: dict, - voice_stay_count: tuple[int, ...] | None = None, - graph_path: list["Chord"] | None = None, - cumulative_trans: "Pitch | None" = None, - node_visit_counts: dict | None = None, - ) -> list[float]: - """Calculate weights for edges based on configuration. - - Uses hybrid approach: - - Hard factors (direct tuning, voice crossing): multiplication (eliminate if factor fails) - - Soft factors: sum normalized per factor, then weighted sum - """ - if not out_edges: - return [] - - # First pass: collect raw factor values for all edges - melodic_values = [] - contrary_values = [] - hamiltonian_values = [] - dca_values = [] - target_values = [] - - for edge in out_edges: - edge_data = edge[2] - - # Hard factors first (to filter invalid edges) - direct_tuning = self._factor_direct_tuning(edge_data, config) - voice_crossing = self._factor_voice_crossing(edge_data, config) - - # Skip if hard factors eliminate this edge - if direct_tuning == 0 or voice_crossing == 0: - melodic_values.append(0) - contrary_values.append(0) - hamiltonian_values.append(0) - dca_values.append(0) - target_values.append(0) - continue - - # Soft factors - melodic_values.append(self._factor_melodic_threshold(edge_data, config)) - contrary_values.append(self._factor_contrary_motion(edge_data, config)) - hamiltonian_values.append( - self._factor_dca_hamiltonian(edge, node_visit_counts, config) - ) - dca_values.append( - self._factor_dca_voice_movement( - edge, path, voice_stay_count, config, cumulative_trans - ) - ) - target_values.append( - self._factor_target_range(edge, path, config, cumulative_trans) - ) - - # Helper function for sum normalization - def sum_normalize(values: list) -> list | None: - """Normalize values to sum to 1. Returns None if no discrimination.""" - total = sum(values) - if total == 0 or len(set(values)) <= 1: - return None # no discrimination - return [v / total for v in values] - - # Sum normalize each factor - melodic_norm = sum_normalize(melodic_values) - contrary_norm = sum_normalize(contrary_values) - hamiltonian_norm = sum_normalize(hamiltonian_values) - dca_norm = sum_normalize(dca_values) - target_norm = sum_normalize(target_values) - - # Second pass: calculate final weights - weights = [] - for i, edge in enumerate(out_edges): - w = 1.0 # base weight - edge_data = edge[2] - - # Hard factors - w *= self._factor_direct_tuning(edge_data, config) - if w == 0: - weights.append(0) - continue - - w *= self._factor_voice_crossing(edge_data, config) - if w == 0: - weights.append(0) - continue - - # Soft factors (sum normalized, then weighted) - if melodic_norm: - w += melodic_norm[i] * config.get("weight_melodic", 1) - if contrary_norm: - w += contrary_norm[i] * config.get("weight_contrary_motion", 0) - if hamiltonian_norm: - w += hamiltonian_norm[i] * config.get("weight_dca_hamiltonian", 1) - if dca_norm: - w += dca_norm[i] * config.get("weight_dca_voice_movement", 1) - if target_norm: - w += target_norm[i] * config.get("weight_target_range", 1) - - weights.append(w) - - return weights - def _factor_melodic_threshold(self, edge_data: dict, config: dict) -> float: """Returns 1.0 if all voice movements are within melodic threshold, 0.0 otherwise.""" # Check weight - if 0, return 1.0 (neutral) diff --git a/src/io.py b/src/io.py index 44f6763..f5063ff 100644 --- a/src/io.py +++ b/src/io.py @@ -448,10 +448,10 @@ def main(): weights_config["max_path"] = args.max_path - path, graph_path = path_finder.find_stochastic_path( + path_obj = path_finder.find_stochastic_path( max_length=args.max_path, weights_config=weights_config ) - print(f"Path length: {len(path)}") + print(f"Path length: {len(path_obj)}") # Create output directory and write files import os @@ -461,22 +461,24 @@ def main(): # Save graph_path for Hamiltonian analysis import json - graph_path_data = [hash(node) for node in graph_path] + graph_path_data = [hash(node) for node in path_obj.graph_chords] graph_path_file = os.path.join(args.output_dir, "graph_path.json") with open(graph_path_file, "w") as f: json.dump(graph_path_data, f) print(f"Written to {graph_path_file}") - write_chord_sequence(path, os.path.join(args.output_dir, "output_chords.json")) + write_chord_sequence( + path_obj.output_chords, os.path.join(args.output_dir, "output_chords.json") + ) print(f"Written to {args.output_dir}/output_chords.json") write_chord_sequence_readable( - path, os.path.join(args.output_dir, "output_chords.txt") + path_obj.output_chords, os.path.join(args.output_dir, "output_chords.txt") ) print(f"Written to {args.output_dir}/output_chords.txt") write_chord_sequence_frequencies( - path, os.path.join(args.output_dir, "output_frequencies.txt") + path_obj.output_chords, os.path.join(args.output_dir, "output_frequencies.txt") ) print(f"Written to {args.output_dir}/output_frequencies.txt") diff --git a/src/path.py b/src/path.py new file mode 100644 index 0000000..e9d45fa --- /dev/null +++ b/src/path.py @@ -0,0 +1,76 @@ +#!/usr/bin/env python +""" +Path and PathStep classes for storing path state from PathFinder. +""" + +from __future__ import annotations +from dataclasses import dataclass, field +from typing import Any + +from .pitch import Pitch +from .chord import Chord + + +@dataclass +class PathStep: + """Stores data for a single step in the path.""" + + graph_node: Chord + output_chord: Chord + transposition: Pitch | None = None + movements: dict[int, int] = field(default_factory=dict) + scores: dict[str, float] = field(default_factory=dict) + candidates: list[dict[str, float]] = field(default_factory=list) + + +class Path: + """Stores the complete state of a generated path.""" + + def __init__( + self, initial_chord: Chord, weights_config: dict[str, Any] | None = None + ): + self.initial_chord = initial_chord + self.steps: list[PathStep] = [] + self.weights_config = weights_config if weights_config is not None else {} + + def add_step( + self, + graph_node: Chord, + output_chord: Chord, + transposition: Pitch | None = None, + movements: dict[int, int] | None = None, + scores: dict[str, float] | None = None, + candidates: list[dict[str, float]] | None = None, + ) -> None: + """Add a step to the path.""" + step = PathStep( + graph_node=graph_node, + output_chord=output_chord, + transposition=transposition, + movements=movements if movements is not None else {}, + scores=scores if scores is not None else {}, + candidates=candidates if candidates is not None else [], + ) + self.steps.append(step) + + @property + def graph_chords(self) -> list[Chord]: + """Get list of graph nodes (original chords).""" + return [self.initial_chord] + [step.graph_node for step in self.steps] + + @property + def output_chords(self) -> list[Chord]: + """Get list of output chords (transposed).""" + return [self.initial_chord] + [step.output_chord for step in self.steps] + + def __len__(self) -> int: + """Total number of chords in path.""" + return len(self.steps) + 1 + + def __iter__(self): + """Iterate over output chords.""" + return iter(self.output_chords) + + def __getitem__(self, index: int) -> Chord: + """Get output chord by index.""" + return self.output_chords[index]