Add Path and Candidate classes for path state tracking

- Add src/path.py with Path and PathStep classes
- Path stores initial_chord, steps, weights_config
- PathStep stores graph_node, output_chord, transposition, movements, scores, candidates
- Refactor find_stochastic_path to use candidates approach
- Separate _build_candidates (raw scores) from _compute_weights
- Simplify return type to Path only (graph_chords available via property)
- Update io.py to use new Path API
This commit is contained in:
Michael Winter 2026-03-16 00:39:32 +01:00
parent b20f02b60f
commit 66669de00f
3 changed files with 256 additions and 133 deletions

View file

@ -4,10 +4,24 @@ PathFinder - finds paths through voice leading graphs.
"""
from __future__ import annotations
from dataclasses import dataclass
import networkx as nx
from random import choices, seed
from typing import Iterator
from .path import Path
@dataclass
class Candidate:
"""A candidate edge with raw factor scores."""
edge: tuple
edge_index: int
graph_node: "Chord"
scores: dict[str, float]
weight: float = 0.0 # computed later by _compute_weights
class PathFinder:
"""Finds paths through voice leading graphs."""
@ -20,26 +34,27 @@ class PathFinder:
start_chord: "Chord | None" = None,
max_length: int = 100,
weights_config: dict | None = None,
) -> tuple[list["Chord"], list["Chord"]]:
) -> Path:
"""Find a stochastic path through the graph.
Returns:
Tuple of (path, graph_path) where:
- path: list of output Chord objects (transposed)
- graph_path: list of original graph Chord objects (untransposed)
Path object containing output chords, graph chords, and metadata
"""
from .pitch import Pitch
from .chord import Chord
if weights_config is None:
weights_config = self._default_weights_config()
chord = self._initialize_chords(start_chord)
if not chord or chord[0] is None or len(self.graph.nodes()) == 0:
return [], []
return Path(chord[0] if chord else None, weights_config)
original_chord = chord[0]
graph_node = original_chord
output_chord = original_chord
path = [output_chord]
path_obj = Path(original_chord, weights_config)
last_graph_nodes = (graph_node,)
graph_path = [graph_node]
@ -49,8 +64,6 @@ class PathFinder:
# Mark start node as just visited
node_visit_counts[graph_node] = 0
from .pitch import Pitch
dims = output_chord.dims
cumulative_trans = Pitch(tuple(0 for _ in range(len(dims))), dims)
@ -69,9 +82,10 @@ class PathFinder:
if not out_edges:
break
weights = self._calculate_edge_weights(
# Build candidates with raw scores
candidates = self._build_candidates(
out_edges,
path,
path_obj.output_chords,
last_graph_nodes,
weights_config,
tuple(voice_stay_count),
@ -80,10 +94,22 @@ class PathFinder:
node_visit_counts,
)
edge = choices(out_edges, weights=weights)[0]
next_graph_node = edge[1]
trans = edge[2].get("transposition")
movement = edge[2].get("movements", {})
# Compute weights from raw scores
self._compute_weights(candidates, weights_config)
# Filter out candidates with zero weight
valid_candidates = [c for c in candidates if c.weight > 0]
if not valid_candidates:
break
# Select using weighted choice
chosen = choices(
valid_candidates, weights=[c.weight for c in valid_candidates]
)[0]
next_graph_node = chosen.graph_node
trans = chosen.edge[2].get("transposition")
movement = chosen.edge[2].get("movements", {})
new_voice_map = [None] * num_voices
for src_idx, dest_idx in movement.items():
@ -98,12 +124,24 @@ class PathFinder:
reordered_pitches = tuple(
transposed.pitches[voice_map[i]] for i in range(num_voices)
)
from .chord import Chord
output_chord = Chord(reordered_pitches, dims)
# Collect all candidates' scores for storage
all_candidates_scores = [c.scores for c in candidates]
# Add step to Path object
path_obj.add_step(
graph_node=next_graph_node,
output_chord=output_chord,
transposition=trans,
movements=movement,
scores=chosen.scores,
candidates=all_candidates_scores,
)
for voice_idx in range(num_voices):
curr_cents = path[-1].pitches[voice_idx].to_cents()
curr_cents = path_obj.output_chords[-1].pitches[voice_idx].to_cents()
next_cents = output_chord.pitches[voice_idx].to_cents()
if curr_cents == next_cents:
voice_stay_count[voice_idx] += 1
@ -117,12 +155,125 @@ class PathFinder:
if next_graph_node in node_visit_counts:
node_visit_counts[next_graph_node] = 0
path.append(output_chord)
last_graph_nodes = last_graph_nodes + (graph_node,)
if len(last_graph_nodes) > 2:
last_graph_nodes = last_graph_nodes[-2:]
return path, graph_path
return path_obj
def _build_candidates(
self,
out_edges: list,
path: list["Chord"],
last_chords: tuple["Chord", ...],
config: dict,
voice_stay_count: tuple[int, ...] | None,
graph_path: list["Chord"] | None,
cumulative_trans: "Pitch | None",
node_visit_counts: dict | None,
) -> list["Candidate"]:
"""Build candidates with raw factor scores only."""
if not out_edges:
return []
candidates = []
for i, edge in enumerate(out_edges):
edge_data = edge[2]
# All factors - always compute verbatim
direct_tuning = self._factor_direct_tuning(edge_data, config)
voice_crossing = self._factor_voice_crossing(edge_data, config)
melodic = self._factor_melodic_threshold(edge_data, config)
contrary = self._factor_contrary_motion(edge_data, config)
hamiltonian = self._factor_dca_hamiltonian(edge, node_visit_counts, config)
dca_voice = self._factor_dca_voice_movement(
edge, path, voice_stay_count, config, cumulative_trans
)
target = self._factor_target_range(edge, path, config, cumulative_trans)
scores = {
"direct_tuning": direct_tuning,
"voice_crossing": voice_crossing,
"melodic_threshold": melodic,
"contrary_motion": contrary,
"dca_hamiltonian": hamiltonian,
"dca_voice_movement": dca_voice,
"target_range": target,
}
candidates.append(Candidate(edge, i, edge[1], scores, 0.0))
return candidates
def _compute_weights(
self,
candidates: list["Candidate"],
config: dict,
) -> list[float]:
"""Compute weights from raw scores for all candidates.
Returns a list of weights, and updates each candidate's weight field.
"""
if not candidates:
return []
# Collect raw values for normalization
melodic_values = [c.scores.get("melodic_threshold", 0) for c in candidates]
contrary_values = [c.scores.get("contrary_motion", 0) for c in candidates]
hamiltonian_values = [c.scores.get("dca_hamiltonian", 0) for c in candidates]
dca_values = [c.scores.get("dca_voice_movement", 0) for c in candidates]
target_values = [c.scores.get("target_range", 0) for c in candidates]
# Helper function for sum normalization
def sum_normalize(values: list) -> list | None:
"""Normalize values to sum to 1. Returns None if no discrimination."""
total = sum(values)
if total == 0 or len(set(values)) <= 1:
return None
return [v / total for v in values]
# Sum normalize each factor
melodic_norm = sum_normalize(melodic_values)
contrary_norm = sum_normalize(contrary_values)
hamiltonian_norm = sum_normalize(hamiltonian_values)
dca_norm = sum_normalize(dca_values)
target_norm = sum_normalize(target_values)
# Calculate weights for each candidate
weights = []
for i, candidate in enumerate(candidates):
scores = candidate.scores
w = 1.0
# Hard factors (multiplicative - eliminates if 0)
w *= scores.get("direct_tuning", 0)
if w == 0:
candidate.weight = 0.0
weights.append(0.0)
continue
w *= scores.get("voice_crossing", 0)
if w == 0:
candidate.weight = 0.0
weights.append(0.0)
continue
# Soft factors (sum normalized, then weighted)
if melodic_norm:
w += melodic_norm[i] * config.get("weight_melodic", 1)
if contrary_norm:
w += contrary_norm[i] * config.get("weight_contrary_motion", 0)
if hamiltonian_norm:
w += hamiltonian_norm[i] * config.get("weight_dca_hamiltonian", 1)
if dca_norm:
w += dca_norm[i] * config.get("weight_dca_voice_movement", 1)
if target_norm:
w += target_norm[i] * config.get("weight_target_range", 1)
candidate.weight = w
weights.append(w)
return weights
def _initialize_chords(self, start_chord: "Chord | None") -> tuple:
"""Initialize chord sequence."""
@ -143,10 +294,10 @@ class PathFinder:
if len(out_edges) == 0:
continue
weights = self._calculate_edge_weights(
out_edges, [chord], (chord,), weights_config, None
candidates = self._build_candidates(
out_edges, [chord], (chord,), weights_config, None, None, None, None
)
nonzero = sum(1 for w in weights if w > 0)
nonzero = sum(1 for c in candidates if c.weight > 0)
if nonzero > 0:
return (chord,)
@ -169,112 +320,6 @@ class PathFinder:
"target_range_octaves": 2.0,
}
def _calculate_edge_weights(
self,
out_edges: list,
path: list["Chord"],
last_chords: tuple["Chord", ...],
config: dict,
voice_stay_count: tuple[int, ...] | None = None,
graph_path: list["Chord"] | None = None,
cumulative_trans: "Pitch | None" = None,
node_visit_counts: dict | None = None,
) -> list[float]:
"""Calculate weights for edges based on configuration.
Uses hybrid approach:
- Hard factors (direct tuning, voice crossing): multiplication (eliminate if factor fails)
- Soft factors: sum normalized per factor, then weighted sum
"""
if not out_edges:
return []
# First pass: collect raw factor values for all edges
melodic_values = []
contrary_values = []
hamiltonian_values = []
dca_values = []
target_values = []
for edge in out_edges:
edge_data = edge[2]
# Hard factors first (to filter invalid edges)
direct_tuning = self._factor_direct_tuning(edge_data, config)
voice_crossing = self._factor_voice_crossing(edge_data, config)
# Skip if hard factors eliminate this edge
if direct_tuning == 0 or voice_crossing == 0:
melodic_values.append(0)
contrary_values.append(0)
hamiltonian_values.append(0)
dca_values.append(0)
target_values.append(0)
continue
# Soft factors
melodic_values.append(self._factor_melodic_threshold(edge_data, config))
contrary_values.append(self._factor_contrary_motion(edge_data, config))
hamiltonian_values.append(
self._factor_dca_hamiltonian(edge, node_visit_counts, config)
)
dca_values.append(
self._factor_dca_voice_movement(
edge, path, voice_stay_count, config, cumulative_trans
)
)
target_values.append(
self._factor_target_range(edge, path, config, cumulative_trans)
)
# Helper function for sum normalization
def sum_normalize(values: list) -> list | None:
"""Normalize values to sum to 1. Returns None if no discrimination."""
total = sum(values)
if total == 0 or len(set(values)) <= 1:
return None # no discrimination
return [v / total for v in values]
# Sum normalize each factor
melodic_norm = sum_normalize(melodic_values)
contrary_norm = sum_normalize(contrary_values)
hamiltonian_norm = sum_normalize(hamiltonian_values)
dca_norm = sum_normalize(dca_values)
target_norm = sum_normalize(target_values)
# Second pass: calculate final weights
weights = []
for i, edge in enumerate(out_edges):
w = 1.0 # base weight
edge_data = edge[2]
# Hard factors
w *= self._factor_direct_tuning(edge_data, config)
if w == 0:
weights.append(0)
continue
w *= self._factor_voice_crossing(edge_data, config)
if w == 0:
weights.append(0)
continue
# Soft factors (sum normalized, then weighted)
if melodic_norm:
w += melodic_norm[i] * config.get("weight_melodic", 1)
if contrary_norm:
w += contrary_norm[i] * config.get("weight_contrary_motion", 0)
if hamiltonian_norm:
w += hamiltonian_norm[i] * config.get("weight_dca_hamiltonian", 1)
if dca_norm:
w += dca_norm[i] * config.get("weight_dca_voice_movement", 1)
if target_norm:
w += target_norm[i] * config.get("weight_target_range", 1)
weights.append(w)
return weights
def _factor_melodic_threshold(self, edge_data: dict, config: dict) -> float:
"""Returns 1.0 if all voice movements are within melodic threshold, 0.0 otherwise."""
# Check weight - if 0, return 1.0 (neutral)

View file

@ -448,10 +448,10 @@ def main():
weights_config["max_path"] = args.max_path
path, graph_path = path_finder.find_stochastic_path(
path_obj = path_finder.find_stochastic_path(
max_length=args.max_path, weights_config=weights_config
)
print(f"Path length: {len(path)}")
print(f"Path length: {len(path_obj)}")
# Create output directory and write files
import os
@ -461,22 +461,24 @@ def main():
# Save graph_path for Hamiltonian analysis
import json
graph_path_data = [hash(node) for node in graph_path]
graph_path_data = [hash(node) for node in path_obj.graph_chords]
graph_path_file = os.path.join(args.output_dir, "graph_path.json")
with open(graph_path_file, "w") as f:
json.dump(graph_path_data, f)
print(f"Written to {graph_path_file}")
write_chord_sequence(path, os.path.join(args.output_dir, "output_chords.json"))
write_chord_sequence(
path_obj.output_chords, os.path.join(args.output_dir, "output_chords.json")
)
print(f"Written to {args.output_dir}/output_chords.json")
write_chord_sequence_readable(
path, os.path.join(args.output_dir, "output_chords.txt")
path_obj.output_chords, os.path.join(args.output_dir, "output_chords.txt")
)
print(f"Written to {args.output_dir}/output_chords.txt")
write_chord_sequence_frequencies(
path, os.path.join(args.output_dir, "output_frequencies.txt")
path_obj.output_chords, os.path.join(args.output_dir, "output_frequencies.txt")
)
print(f"Written to {args.output_dir}/output_frequencies.txt")

76
src/path.py Normal file
View file

@ -0,0 +1,76 @@
#!/usr/bin/env python
"""
Path and PathStep classes for storing path state from PathFinder.
"""
from __future__ import annotations
from dataclasses import dataclass, field
from typing import Any
from .pitch import Pitch
from .chord import Chord
@dataclass
class PathStep:
"""Stores data for a single step in the path."""
graph_node: Chord
output_chord: Chord
transposition: Pitch | None = None
movements: dict[int, int] = field(default_factory=dict)
scores: dict[str, float] = field(default_factory=dict)
candidates: list[dict[str, float]] = field(default_factory=list)
class Path:
"""Stores the complete state of a generated path."""
def __init__(
self, initial_chord: Chord, weights_config: dict[str, Any] | None = None
):
self.initial_chord = initial_chord
self.steps: list[PathStep] = []
self.weights_config = weights_config if weights_config is not None else {}
def add_step(
self,
graph_node: Chord,
output_chord: Chord,
transposition: Pitch | None = None,
movements: dict[int, int] | None = None,
scores: dict[str, float] | None = None,
candidates: list[dict[str, float]] | None = None,
) -> None:
"""Add a step to the path."""
step = PathStep(
graph_node=graph_node,
output_chord=output_chord,
transposition=transposition,
movements=movements if movements is not None else {},
scores=scores if scores is not None else {},
candidates=candidates if candidates is not None else [],
)
self.steps.append(step)
@property
def graph_chords(self) -> list[Chord]:
"""Get list of graph nodes (original chords)."""
return [self.initial_chord] + [step.graph_node for step in self.steps]
@property
def output_chords(self) -> list[Chord]:
"""Get list of output chords (transposed)."""
return [self.initial_chord] + [step.output_chord for step in self.steps]
def __len__(self) -> int:
"""Total number of chords in path."""
return len(self.steps) + 1
def __iter__(self):
"""Iterate over output chords."""
return iter(self.output_chords)
def __getitem__(self, index: int) -> Chord:
"""Get output chord by index."""
return self.output_chords[index]