compact_sets/src/path.py

495 lines
18 KiB
Python
Raw Normal View History

#!/usr/bin/env python
"""
Path and PathStep classes for storing path state from PathFinder.
"""
from __future__ import annotations
from dataclasses import dataclass, field
from typing import Any
from .pitch import Pitch
from .chord import Chord
@dataclass
class PathStep:
"""Stores data for a single step (edge) in the path."""
source_node: Chord
destination_node: Chord
source_chord: Chord
destination_chord: Chord
transposition: Pitch | None = None
movements: dict[int, int] = field(default_factory=dict)
scores: dict[str, float] = field(default_factory=dict)
normalized_scores: dict[str, float | None] = field(default_factory=dict)
weight: float = 0.0 # computed later by compute_weights
last_visited_counts_before: dict | None = None
last_visited_counts_after: dict | None = None
sustain_counts_before: tuple[int, ...] | None = None
sustain_counts_after: tuple[int, ...] | None = None
new_cumulative_trans: Pitch | None = None
new_voice_map: list[int] = field(default_factory=list)
edge_data: dict = field(default_factory=dict)
class Path:
"""Stores the complete state of a generated path."""
def __init__(
self, initial_chord: Chord | None, weights_config: dict[str, Any] | None = None
):
self.initial_chord = initial_chord
self.steps: list[PathStep] = []
self.weights_config = weights_config if weights_config is not None else {}
# State needed for step computation
self._voice_map: list[int] = [] # which voice is at each position
self._cumulative_trans: Pitch | None = None # cumulative transposition
self._graph_nodes: set = set() # all graph nodes for visit tracking
self._num_voices: int = 0 # number of voices
def init_state(
self, graph_nodes: set, num_voices: int, initial_chord: Chord
) -> None:
"""Initialize state after graph is known."""
self._graph_nodes = graph_nodes
self._num_voices = num_voices
self._voice_map = list(range(num_voices)) # voice i at position i
dims = initial_chord.dims
self._cumulative_trans = Pitch(tuple(0 for _ in range(len(dims))), dims)
def get_candidates(
self, out_edges: list, path_chords: list[Chord]
) -> list[PathStep]:
"""Generate candidates from graph edges using current state.
Applies current cumulative_trans and voice_map to create destination_chord.
Computes all factor scores.
"""
if not out_edges or not path_chords:
return []
source_chord = path_chords[-1]
candidates = []
for edge in out_edges:
source_node = edge[0]
destination_node = edge[1]
edge_data = edge[2]
trans = edge_data.get("transposition")
movement = edge_data.get("movements", {})
# Compute new state after this candidate
if trans is not None and self._cumulative_trans is not None:
new_cumulative_trans = self._cumulative_trans.transpose(trans)
else:
new_cumulative_trans = self._cumulative_trans
new_voice_map = [None] * len(self._voice_map)
for src_idx, dest_idx in movement.items():
new_voice_map[dest_idx] = self._voice_map[src_idx]
# Transpose destination node using current cumulative_trans
if trans is not None and self._cumulative_trans is not None:
transposed = destination_node.transpose(
self._cumulative_trans.transpose(trans)
)
else:
transposed = destination_node
# Apply voice map (reorder pitches)
reordered_pitches = tuple(
transposed.pitches[new_voice_map[i]] for i in range(len(new_voice_map))
)
destination_chord = Chord(reordered_pitches, destination_node.dims)
# Compute all factor scores
last_visited_before = self._get_last_visited_counts()
sustain_before = self._get_sustain_counts()
scores = {
"direct_tuning": self._factor_direct_tuning(
edge_data, self.weights_config
),
"voice_crossing": self._factor_voice_crossing(
edge_data, self.weights_config
),
"melodic_threshold": self._factor_melodic_threshold(
edge_data, self.weights_config
),
"contrary_motion": self._factor_contrary_motion(
edge_data, self.weights_config
),
"dca_hamiltonian": self._factor_dca_hamiltonian(
destination_node, last_visited_before, self.weights_config
),
"dca_voice_movement": self._factor_dca_voice_movement(
source_chord,
destination_chord,
sustain_before,
self.weights_config,
),
"target_register": self._factor_target_register(
path_chords,
destination_chord,
self.weights_config,
),
}
# Compute AFTER state for this candidate
last_visited_after = dict(last_visited_before)
for node in last_visited_after:
last_visited_after[node] += 1
last_visited_after[destination_node] = 0
sustain_after = list(sustain_before)
for voice_idx in range(len(sustain_after)):
curr_cents = source_chord.pitches[voice_idx].to_cents()
next_cents = destination_chord.pitches[voice_idx].to_cents()
if curr_cents == next_cents:
sustain_after[voice_idx] += 1
else:
sustain_after[voice_idx] = 0
step = PathStep(
source_node=source_node,
destination_node=destination_node,
source_chord=source_chord,
destination_chord=destination_chord,
transposition=trans,
movements=movement,
scores=scores,
last_visited_counts_before=last_visited_before,
last_visited_counts_after=last_visited_after,
sustain_counts_before=sustain_before,
sustain_counts_after=tuple(sustain_after),
new_cumulative_trans=new_cumulative_trans,
new_voice_map=new_voice_map,
edge_data=edge_data,
)
candidates.append(step)
return candidates
def compute_weights(self, candidates: list[PathStep], config: dict) -> list[float]:
"""Compute weights from raw scores for all candidates.
Updates each step's weight field.
"""
if not candidates:
return []
melodic_values = [c.scores.get("melodic_threshold", 0) for c in candidates]
contrary_values = [c.scores.get("contrary_motion", 0) for c in candidates]
hamiltonian_values = [c.scores.get("dca_hamiltonian", 0) for c in candidates]
dca_values = [c.scores.get("dca_voice_movement", 0) for c in candidates]
target_values = [c.scores.get("target_register", 0) for c in candidates]
def sum_normalize(values: list) -> list | None:
"""Normalize values to sum to 1. Returns None if no discrimination."""
total = sum(values)
if total == 0 or len(set(values)) <= 1:
return None
return [v / total for v in values]
melodic_norm = sum_normalize(melodic_values)
contrary_norm = sum_normalize(contrary_values)
hamiltonian_norm = sum_normalize(hamiltonian_values)
dca_norm = sum_normalize(dca_values)
target_norm = sum_normalize(target_values)
weights = []
for i, step in enumerate(candidates):
scores = step.scores
w = 1.0
w *= scores.get("direct_tuning", 0)
if w == 0:
step.weight = 0.0
weights.append(0.0)
continue
w *= scores.get("voice_crossing", 0)
if w == 0:
step.weight = 0.0
weights.append(0.0)
continue
if melodic_norm:
w *= melodic_norm[i] * config.get("weight_melodic", 1)
if contrary_norm:
w *= contrary_norm[i] * config.get("weight_contrary_motion", 0)
if hamiltonian_norm:
w *= hamiltonian_norm[i] * config.get("weight_dca_hamiltonian", 1)
if dca_norm:
w *= dca_norm[i] * config.get("weight_dca_voice_movement", 1)
if target_norm:
w *= target_norm[i] * config.get("weight_target_register", 1)
step.weight = w**16
weights.append(w)
# Store normalized scores (0-1 range) for influence calculation
step.normalized_scores = {
"melodic_threshold": melodic_norm[i] if melodic_norm else None,
"contrary_motion": contrary_norm[i] if contrary_norm else None,
"dca_hamiltonian": hamiltonian_norm[i] if hamiltonian_norm else None,
"dca_voice_movement": dca_norm[i] if dca_norm else None,
"target_register": target_norm[i] if target_norm else None,
}
return weights
# ========== Factor Methods ==========
def _factor_melodic_threshold(self, edge_data: dict, config: dict) -> float:
"""Returns continuous score based on melodic threshold."""
melodic_min = config.get("melodic_threshold_min", 0)
melodic_max = config.get("melodic_threshold_max", float("inf"))
cent_diffs = edge_data.get("cent_diffs", [])
if not cent_diffs:
return 1.0
product = 1.0
for cents in cent_diffs:
abs_cents = abs(cents)
if abs_cents == 0:
score = 1.0
elif abs_cents < melodic_min:
score = (abs_cents / melodic_min) ** 3
elif abs_cents > melodic_max:
score = ((1200 - abs_cents) / (1200 - melodic_max)) ** 3
else:
score = 1.0
product *= score
return product
def _factor_direct_tuning(self, edge_data: dict, config: dict) -> float:
"""Returns 1.0 if directly tunable (or disabled), 0.0 otherwise."""
if config.get("weight_direct_tuning", 1) == 0:
return 1.0
if config.get("direct_tuning", True):
if edge_data.get("is_directly_tunable", False):
return 1.0
return 0.0
return 1.0
def _factor_voice_crossing(self, edge_data: dict, config: dict) -> float:
"""Returns 1.0 if no voice crossing (or allowed), 0.0 if crossing."""
if config.get("voice_crossing_allowed", False):
return 1.0
if edge_data.get("voice_crossing", False):
return 0.0
return 1.0
def _factor_contrary_motion(self, edge_data: dict, config: dict) -> float:
"""Returns factor based on contrary motion."""
if config.get("weight_contrary_motion", 0) == 0:
return 1.0
cent_diffs = edge_data.get("cent_diffs", [])
num_up = sum(1 for d in cent_diffs if d > 0)
num_down = sum(1 for d in cent_diffs if d < 0)
num_moving = num_up + num_down
if num_moving < 2:
return 0.0
ideal_up = num_moving / 2
distance = abs(num_up - ideal_up)
return max(0.0, 1.0 - (distance / ideal_up))
def _factor_dca_hamiltonian(
self, destination_node, last_visited_counts: dict, config: dict
) -> float:
"""Returns score based on how long since node was last visited."""
if config.get("weight_dca_hamiltonian", 1) == 0:
return 1.0
if last_visited_counts is None:
return 0.0
visit_count = last_visited_counts.get(destination_node, 0)
return float(visit_count)
def _factor_dca_voice_movement(
self,
source_chord: Chord,
destination_chord: Chord,
sustain_counts: tuple[int, ...],
config: dict,
) -> float:
"""Returns probability that voices will change."""
if config.get("weight_dca_voice_movement", 1) == 0:
return 1.0
if sustain_counts is None:
return 1.0
num_voices = len(sustain_counts)
if num_voices == 0:
return 1.0
current_cents = [p.to_cents() for p in source_chord.pitches]
candidate_cents = [p.to_cents() for p in destination_chord.pitches]
sum_changing = 0
sum_all = sum(sustain_counts)
for voice_idx in range(num_voices):
if current_cents[voice_idx] != candidate_cents[voice_idx]:
sum_changing += sustain_counts[voice_idx]
return sum_changing
def _factor_target_register(
self,
path_chords: list[Chord],
destination_chord: Chord,
config: dict,
) -> float:
"""Returns factor based on movement toward target."""
if config.get("weight_target_register", 1) == 0:
return 1.0
if not config.get("target_register", False):
return 1.0
if len(path_chords) == 0:
return 1.0
target_octaves = config.get("target_register_octaves", 2.0)
max_path = config.get("max_path", 50)
target_cents = target_octaves * 1200
start_avg_cents = sum(p.to_cents() for p in path_chords[0].pitches) / len(
path_chords[0].pitches
)
progress = len(path_chords) / max_path
current_target = start_avg_cents + (progress * target_cents)
current_chord = path_chords[-1]
current_avg_cents = sum(p.to_cents() for p in current_chord.pitches) / len(
current_chord.pitches
)
candidate_avg_cents = sum(
p.to_cents() for p in destination_chord.pitches
) / len(destination_chord.pitches)
if current_target <= 0:
return 1.0
dist_before = abs(current_avg_cents - current_target)
dist_after = abs(candidate_avg_cents - current_target)
if dist_before == 0:
return 1.0
if dist_after < dist_before:
return 1.0 + (dist_before - dist_after) / dist_before
elif dist_after > dist_before:
return max(0.1, 1.0 - (dist_after - dist_before) / dist_before)
else:
return 1.0
# ========== State Methods ==========
def _get_last_visited_counts(self) -> dict:
"""Get last visited counts from the last step, or initialize fresh."""
if self.steps:
last_step = self.steps[-1]
if last_step.last_visited_counts_after is not None:
return dict(last_step.last_visited_counts_after)
return {node: 0 for node in self._graph_nodes}
def _get_sustain_counts(self) -> tuple:
"""Get sustain counts from the last step, or initialize fresh."""
if self.steps:
last_step = self.steps[-1]
if last_step.sustain_counts_after is not None:
return last_step.sustain_counts_after
return tuple(0 for _ in range(self._num_voices))
def step(self, step: PathStep) -> PathStep:
"""Commit a chosen candidate to the path.
All state was computed in get_candidates().
This just applies the stored new state and commits the step.
"""
# Apply stored new state
self._cumulative_trans = step.new_cumulative_trans
self._voice_map = step.new_voice_map
# Commit
self.steps.append(step)
return step
@property
def graph_chords(self) -> list[Chord]:
"""Get list of destination graph nodes."""
return [self.initial_chord] + [step.destination_node for step in self.steps]
@property
def output_chords(self) -> list[Chord]:
"""Get list of destination chords (transposed)."""
return [self.initial_chord] + [step.destination_chord for step in self.steps]
def __len__(self) -> int:
"""Total number of chords in path."""
return len(self.steps) + 1
def __iter__(self):
"""Iterate over output chords."""
return iter(self.output_chords)
def get_influence(self, weights: dict[str, Any]) -> dict[str, float]:
"""Compute weighted score contribution per factor for chosen candidates.
Uses normalized scores (0-1 range) for consistent influence across factors.
"""
influence = {
"melodic": 0.0,
"contrary_motion": 0.0,
"dca_hamiltonian": 0.0,
"dca_voice_movement": 0.0,
"target_register": 0.0,
}
w_melodic = weights.get("weight_melodic", 1)
w_contrary = weights.get("weight_contrary_motion", 0)
w_hamiltonian = weights.get("weight_dca_hamiltonian", 1)
w_dca = weights.get("weight_dca_voice_movement", 1)
w_target = weights.get("weight_target_register", 1)
for step in self.steps:
norm = step.normalized_scores
influence["melodic"] += (norm.get("melodic_threshold") or 0) * w_melodic
influence["contrary_motion"] += (
norm.get("contrary_motion") or 0
) * w_contrary
influence["dca_hamiltonian"] += (
norm.get("dca_hamiltonian") or 0
) * w_hamiltonian
influence["dca_voice_movement"] += (
norm.get("dca_voice_movement") or 0
) * w_dca
influence["target_register"] += (
norm.get("target_register") or 0
) * w_target
return influence