Refactor: Move factor methods to Path class, add normalized_scores
- Move all _factor_* methods from pathfinder.py to path.py - Add get_candidates() and compute_weights() to Path class - Simplify step() to just commit chosen candidate - Add normalized_scores field for consistent influence calculation - Remove duplicate transposition/voice_map logic between get_candidates and step - dca_voice_movement and target_range now use destination_chord directly
This commit is contained in:
parent
482f2b0df5
commit
7809fa5a76
431
src/path.py
431
src/path.py
|
|
@ -22,11 +22,14 @@ class PathStep:
|
|||
transposition: Pitch | None = None
|
||||
movements: dict[int, int] = field(default_factory=dict)
|
||||
scores: dict[str, float] = field(default_factory=dict)
|
||||
weight: float = 0.0 # computed later by _compute_weights
|
||||
normalized_scores: dict[str, float | None] = field(default_factory=dict)
|
||||
weight: float = 0.0 # computed later by compute_weights
|
||||
last_visited_counts_before: dict | None = None
|
||||
last_visited_counts_after: dict | None = None
|
||||
sustain_counts_before: tuple[int, ...] | None = None
|
||||
sustain_counts_after: tuple[int, ...] | None = None
|
||||
new_cumulative_trans: Pitch | None = None
|
||||
new_voice_map: list[int] = field(default_factory=list)
|
||||
|
||||
|
||||
class Path:
|
||||
|
|
@ -56,6 +59,350 @@ class Path:
|
|||
dims = initial_chord.dims
|
||||
self._cumulative_trans = Pitch(tuple(0 for _ in range(len(dims))), dims)
|
||||
|
||||
def get_candidates(
|
||||
self, out_edges: list, path_chords: list[Chord]
|
||||
) -> list[PathStep]:
|
||||
"""Generate candidates from graph edges using current state.
|
||||
|
||||
Applies current cumulative_trans and voice_map to create destination_chord.
|
||||
Computes all factor scores.
|
||||
"""
|
||||
if not out_edges or not path_chords:
|
||||
return []
|
||||
|
||||
source_chord = path_chords[-1]
|
||||
candidates = []
|
||||
|
||||
for edge in out_edges:
|
||||
source_node = edge[0]
|
||||
destination_node = edge[1]
|
||||
edge_data = edge[2]
|
||||
|
||||
trans = edge_data.get("transposition")
|
||||
movement = edge_data.get("movements", {})
|
||||
|
||||
# Compute new state after this candidate
|
||||
if trans is not None and self._cumulative_trans is not None:
|
||||
new_cumulative_trans = self._cumulative_trans.transpose(trans)
|
||||
else:
|
||||
new_cumulative_trans = self._cumulative_trans
|
||||
|
||||
new_voice_map = [None] * len(self._voice_map)
|
||||
for src_idx, dest_idx in movement.items():
|
||||
new_voice_map[dest_idx] = self._voice_map[src_idx]
|
||||
|
||||
# Transpose destination node using current cumulative_trans
|
||||
if trans is not None and self._cumulative_trans is not None:
|
||||
transposed = destination_node.transpose(
|
||||
self._cumulative_trans.transpose(trans)
|
||||
)
|
||||
else:
|
||||
transposed = destination_node
|
||||
|
||||
# Apply voice map (reorder pitches)
|
||||
reordered_pitches = tuple(
|
||||
transposed.pitches[new_voice_map[i]] for i in range(len(new_voice_map))
|
||||
)
|
||||
destination_chord = Chord(reordered_pitches, destination_node.dims)
|
||||
|
||||
# Compute all factor scores
|
||||
last_visited_before = self._get_last_visited_counts()
|
||||
sustain_before = self._get_sustain_counts()
|
||||
|
||||
scores = {
|
||||
"direct_tuning": self._factor_direct_tuning(
|
||||
edge_data, self.weights_config
|
||||
),
|
||||
"voice_crossing": self._factor_voice_crossing(
|
||||
edge_data, self.weights_config
|
||||
),
|
||||
"melodic_threshold": self._factor_melodic_threshold(
|
||||
edge_data, self.weights_config
|
||||
),
|
||||
"contrary_motion": self._factor_contrary_motion(
|
||||
edge_data, self.weights_config
|
||||
),
|
||||
"dca_hamiltonian": self._factor_dca_hamiltonian(
|
||||
destination_node, last_visited_before, self.weights_config
|
||||
),
|
||||
"dca_voice_movement": self._factor_dca_voice_movement(
|
||||
source_chord,
|
||||
destination_chord,
|
||||
sustain_before,
|
||||
self.weights_config,
|
||||
),
|
||||
"target_range": self._factor_target_range(
|
||||
path_chords,
|
||||
destination_chord,
|
||||
self.weights_config,
|
||||
),
|
||||
}
|
||||
|
||||
# Compute AFTER state for this candidate
|
||||
last_visited_after = dict(last_visited_before)
|
||||
for node in last_visited_after:
|
||||
last_visited_after[node] += 1
|
||||
last_visited_after[destination_node] = 0
|
||||
|
||||
sustain_after = list(sustain_before)
|
||||
for voice_idx in range(len(sustain_after)):
|
||||
curr_cents = source_chord.pitches[voice_idx].to_cents()
|
||||
next_cents = destination_chord.pitches[voice_idx].to_cents()
|
||||
if curr_cents == next_cents:
|
||||
sustain_after[voice_idx] += 1
|
||||
else:
|
||||
sustain_after[voice_idx] = 0
|
||||
|
||||
step = PathStep(
|
||||
source_node=source_node,
|
||||
destination_node=destination_node,
|
||||
source_chord=source_chord,
|
||||
destination_chord=destination_chord,
|
||||
transposition=trans,
|
||||
movements=movement,
|
||||
scores=scores,
|
||||
last_visited_counts_before=last_visited_before,
|
||||
last_visited_counts_after=last_visited_after,
|
||||
sustain_counts_before=sustain_before,
|
||||
sustain_counts_after=tuple(sustain_after),
|
||||
new_cumulative_trans=new_cumulative_trans,
|
||||
new_voice_map=new_voice_map,
|
||||
)
|
||||
|
||||
candidates.append(step)
|
||||
|
||||
return candidates
|
||||
|
||||
def compute_weights(self, candidates: list[PathStep], config: dict) -> list[float]:
|
||||
"""Compute weights from raw scores for all candidates.
|
||||
|
||||
Updates each step's weight field.
|
||||
"""
|
||||
if not candidates:
|
||||
return []
|
||||
|
||||
melodic_values = [c.scores.get("melodic_threshold", 0) for c in candidates]
|
||||
contrary_values = [c.scores.get("contrary_motion", 0) for c in candidates]
|
||||
hamiltonian_values = [c.scores.get("dca_hamiltonian", 0) for c in candidates]
|
||||
dca_values = [c.scores.get("dca_voice_movement", 0) for c in candidates]
|
||||
target_values = [c.scores.get("target_range", 0) for c in candidates]
|
||||
|
||||
def sum_normalize(values: list) -> list | None:
|
||||
"""Normalize values to sum to 1. Returns None if no discrimination."""
|
||||
total = sum(values)
|
||||
if total == 0 or len(set(values)) <= 1:
|
||||
return None
|
||||
return [v / total for v in values]
|
||||
|
||||
melodic_norm = sum_normalize(melodic_values)
|
||||
contrary_norm = sum_normalize(contrary_values)
|
||||
hamiltonian_norm = sum_normalize(hamiltonian_values)
|
||||
dca_norm = sum_normalize(dca_values)
|
||||
target_norm = sum_normalize(target_values)
|
||||
|
||||
weights = []
|
||||
for i, step in enumerate(candidates):
|
||||
scores = step.scores
|
||||
w = 1.0
|
||||
|
||||
w *= scores.get("direct_tuning", 0)
|
||||
if w == 0:
|
||||
step.weight = 0.0
|
||||
weights.append(0.0)
|
||||
continue
|
||||
|
||||
w *= scores.get("voice_crossing", 0)
|
||||
if w == 0:
|
||||
step.weight = 0.0
|
||||
weights.append(0.0)
|
||||
continue
|
||||
|
||||
if melodic_norm:
|
||||
w += melodic_norm[i] * config.get("weight_melodic", 1)
|
||||
if contrary_norm:
|
||||
w += contrary_norm[i] * config.get("weight_contrary_motion", 0)
|
||||
if hamiltonian_norm:
|
||||
w += hamiltonian_norm[i] * config.get("weight_dca_hamiltonian", 1)
|
||||
if dca_norm:
|
||||
w += dca_norm[i] * config.get("weight_dca_voice_movement", 1)
|
||||
if target_norm:
|
||||
w += target_norm[i] * config.get("weight_target_range", 1)
|
||||
|
||||
step.weight = w
|
||||
weights.append(w)
|
||||
|
||||
# Store normalized scores (0-1 range) for influence calculation
|
||||
step.normalized_scores = {
|
||||
"melodic_threshold": melodic_norm[i] if melodic_norm else None,
|
||||
"contrary_motion": contrary_norm[i] if contrary_norm else None,
|
||||
"dca_hamiltonian": hamiltonian_norm[i] if hamiltonian_norm else None,
|
||||
"dca_voice_movement": dca_norm[i] if dca_norm else None,
|
||||
"target_range": target_norm[i] if target_norm else None,
|
||||
}
|
||||
|
||||
return weights
|
||||
|
||||
# ========== Factor Methods ==========
|
||||
|
||||
def _factor_melodic_threshold(self, edge_data: dict, config: dict) -> float:
|
||||
"""Returns continuous score based on melodic threshold."""
|
||||
melodic_min = config.get("melodic_threshold_min", 0)
|
||||
melodic_max = config.get("melodic_threshold_max", float("inf"))
|
||||
|
||||
cent_diffs = edge_data.get("cent_diffs", [])
|
||||
|
||||
if not cent_diffs:
|
||||
return 1.0
|
||||
|
||||
product = 1.0
|
||||
for cents in cent_diffs:
|
||||
abs_cents = abs(cents)
|
||||
if abs_cents == 0:
|
||||
score = 1.0
|
||||
elif abs_cents < melodic_min:
|
||||
score = (abs_cents / melodic_min) ** 3
|
||||
elif abs_cents > melodic_max:
|
||||
score = ((1200 - abs_cents) / (1200 - melodic_max)) ** 3
|
||||
else:
|
||||
score = 1.0
|
||||
product *= score
|
||||
|
||||
return product
|
||||
|
||||
def _factor_direct_tuning(self, edge_data: dict, config: dict) -> float:
|
||||
"""Returns 1.0 if directly tunable (or disabled), 0.0 otherwise."""
|
||||
if config.get("weight_direct_tuning", 1) == 0:
|
||||
return 1.0
|
||||
|
||||
if config.get("direct_tuning", True):
|
||||
if edge_data.get("is_directly_tunable", False):
|
||||
return 1.0
|
||||
return 0.0
|
||||
return 1.0
|
||||
|
||||
def _factor_voice_crossing(self, edge_data: dict, config: dict) -> float:
|
||||
"""Returns 1.0 if no voice crossing (or allowed), 0.0 if crossing."""
|
||||
if config.get("voice_crossing_allowed", False):
|
||||
return 1.0
|
||||
|
||||
if edge_data.get("voice_crossing", False):
|
||||
return 0.0
|
||||
return 1.0
|
||||
|
||||
def _factor_contrary_motion(self, edge_data: dict, config: dict) -> float:
|
||||
"""Returns factor based on contrary motion."""
|
||||
if config.get("weight_contrary_motion", 0) == 0:
|
||||
return 1.0
|
||||
|
||||
cent_diffs = edge_data.get("cent_diffs", [])
|
||||
|
||||
num_up = sum(1 for d in cent_diffs if d > 0)
|
||||
num_down = sum(1 for d in cent_diffs if d < 0)
|
||||
num_moving = num_up + num_down
|
||||
|
||||
if num_moving < 2:
|
||||
return 0.0
|
||||
|
||||
ideal_up = num_moving / 2
|
||||
distance = abs(num_up - ideal_up)
|
||||
return max(0.0, 1.0 - (distance / ideal_up))
|
||||
|
||||
def _factor_dca_hamiltonian(
|
||||
self, destination_node, last_visited_counts: dict, config: dict
|
||||
) -> float:
|
||||
"""Returns score based on how long since node was last visited."""
|
||||
if config.get("weight_dca_hamiltonian", 1) == 0:
|
||||
return 1.0
|
||||
|
||||
if last_visited_counts is None:
|
||||
return 0.0
|
||||
|
||||
visit_count = last_visited_counts.get(destination_node, 0)
|
||||
return float(visit_count)
|
||||
|
||||
def _factor_dca_voice_movement(
|
||||
self,
|
||||
source_chord: Chord,
|
||||
destination_chord: Chord,
|
||||
sustain_counts: tuple[int, ...],
|
||||
config: dict,
|
||||
) -> float:
|
||||
"""Returns probability that voices will change."""
|
||||
if config.get("weight_dca_voice_movement", 1) == 0:
|
||||
return 1.0
|
||||
|
||||
if sustain_counts is None:
|
||||
return 1.0
|
||||
|
||||
num_voices = len(sustain_counts)
|
||||
if num_voices == 0:
|
||||
return 1.0
|
||||
|
||||
current_cents = [p.to_cents() for p in source_chord.pitches]
|
||||
candidate_cents = [p.to_cents() for p in destination_chord.pitches]
|
||||
|
||||
sum_changing = 0
|
||||
sum_all = sum(sustain_counts)
|
||||
|
||||
for voice_idx in range(num_voices):
|
||||
if current_cents[voice_idx] != candidate_cents[voice_idx]:
|
||||
sum_changing += sustain_counts[voice_idx]
|
||||
|
||||
return sum_changing
|
||||
|
||||
def _factor_target_range(
|
||||
self,
|
||||
path_chords: list[Chord],
|
||||
destination_chord: Chord,
|
||||
config: dict,
|
||||
) -> float:
|
||||
"""Returns factor based on movement toward target."""
|
||||
if config.get("weight_target_range", 1) == 0:
|
||||
return 1.0
|
||||
|
||||
if not config.get("target_range", False):
|
||||
return 1.0
|
||||
|
||||
if len(path_chords) == 0:
|
||||
return 1.0
|
||||
|
||||
target_octaves = config.get("target_range_octaves", 2.0)
|
||||
max_path = config.get("max_path", 50)
|
||||
target_cents = target_octaves * 1200
|
||||
|
||||
start_avg_cents = sum(p.to_cents() for p in path_chords[0].pitches) / len(
|
||||
path_chords[0].pitches
|
||||
)
|
||||
progress = len(path_chords) / max_path
|
||||
current_target = start_avg_cents + (progress * target_cents)
|
||||
|
||||
current_chord = path_chords[-1]
|
||||
current_avg_cents = sum(p.to_cents() for p in current_chord.pitches) / len(
|
||||
current_chord.pitches
|
||||
)
|
||||
|
||||
candidate_avg_cents = sum(
|
||||
p.to_cents() for p in destination_chord.pitches
|
||||
) / len(destination_chord.pitches)
|
||||
|
||||
if current_target <= 0:
|
||||
return 1.0
|
||||
|
||||
dist_before = abs(current_avg_cents - current_target)
|
||||
dist_after = abs(candidate_avg_cents - current_target)
|
||||
|
||||
if dist_before == 0:
|
||||
return 1.0
|
||||
|
||||
if dist_after < dist_before:
|
||||
return 1.0 + (dist_before - dist_after) / dist_before
|
||||
elif dist_after > dist_before:
|
||||
return max(0.1, 1.0 - (dist_after - dist_before) / dist_before)
|
||||
else:
|
||||
return 1.0
|
||||
|
||||
# ========== State Methods ==========
|
||||
|
||||
def _get_last_visited_counts(self) -> dict:
|
||||
"""Get last visited counts from the last step, or initialize fresh."""
|
||||
if self.steps:
|
||||
|
|
@ -63,7 +410,6 @@ class Path:
|
|||
if last_step.last_visited_counts_after is not None:
|
||||
return dict(last_step.last_visited_counts_after)
|
||||
|
||||
# Initialize fresh: all nodes start at 0 (except initial which we set to 0 explicitly)
|
||||
return {node: 0 for node in self._graph_nodes}
|
||||
|
||||
def _get_sustain_counts(self) -> tuple:
|
||||
|
|
@ -73,52 +419,19 @@ class Path:
|
|||
if last_step.sustain_counts_after is not None:
|
||||
return last_step.sustain_counts_after
|
||||
|
||||
# Initialize fresh: all voices start at 0
|
||||
return tuple(0 for _ in range(self._num_voices))
|
||||
|
||||
def step(self, step: PathStep) -> PathStep:
|
||||
"""Add a completed step to the path.
|
||||
"""Commit a chosen candidate to the path.
|
||||
|
||||
Takes a PathStep (computed as a hypothetical step), updates internal state,
|
||||
and adds it to the path.
|
||||
All state was computed in get_candidates().
|
||||
This just applies the stored new state and commits the step.
|
||||
"""
|
||||
# Update cumulative transposition
|
||||
if step.transposition is not None:
|
||||
self._cumulative_trans = self._cumulative_trans.transpose(
|
||||
step.transposition
|
||||
)
|
||||
|
||||
# Update voice map based on movement
|
||||
new_voice_map = [None] * len(self._voice_map)
|
||||
for src_idx, dest_idx in step.movements.items():
|
||||
new_voice_map[dest_idx] = self._voice_map[src_idx]
|
||||
self._voice_map = new_voice_map
|
||||
|
||||
# Get BEFORE state from last step (or initialize fresh)
|
||||
last_visited_before = self._get_last_visited_counts()
|
||||
sustain_before = self._get_sustain_counts()
|
||||
|
||||
# Compute AFTER state
|
||||
last_visited_after = dict(last_visited_before)
|
||||
for node in last_visited_after:
|
||||
last_visited_after[node] += 1
|
||||
last_visited_after[step.destination_node] = 0
|
||||
|
||||
sustain_after = list(sustain_before)
|
||||
for voice_idx in range(len(sustain_after)):
|
||||
curr_cents = step.source_chord.pitches[voice_idx].to_cents()
|
||||
next_cents = step.destination_chord.pitches[voice_idx].to_cents()
|
||||
if curr_cents == next_cents:
|
||||
sustain_after[voice_idx] += 1
|
||||
else:
|
||||
sustain_after[voice_idx] = 0
|
||||
|
||||
# Update step with computed state
|
||||
step.last_visited_counts_before = last_visited_before
|
||||
step.last_visited_counts_after = last_visited_after
|
||||
step.sustain_counts_before = sustain_before
|
||||
step.sustain_counts_after = tuple(sustain_after)
|
||||
# Apply stored new state
|
||||
self._cumulative_trans = step.new_cumulative_trans
|
||||
self._voice_map = step.new_voice_map
|
||||
|
||||
# Commit
|
||||
self.steps.append(step)
|
||||
return step
|
||||
|
||||
|
|
@ -143,8 +456,7 @@ class Path:
|
|||
def get_influence(self, weights: dict[str, Any]) -> dict[str, float]:
|
||||
"""Compute weighted score contribution per factor for chosen candidates.
|
||||
|
||||
Returns a dict mapping factor name to accumulated influence (weight * score)
|
||||
for all steps in the path.
|
||||
Uses normalized scores (0-1 range) for consistent influence across factors.
|
||||
"""
|
||||
influence = {
|
||||
"melodic": 0.0,
|
||||
|
|
@ -154,24 +466,25 @@ class Path:
|
|||
"target_range": 0.0,
|
||||
}
|
||||
|
||||
for step in self.steps:
|
||||
scores = step.scores
|
||||
w_melodic = weights.get("weight_melodic", 1)
|
||||
w_contrary = weights.get("weight_contrary_motion", 0)
|
||||
w_hamiltonian = weights.get("weight_dca_hamiltonian", 1)
|
||||
w_dca = weights.get("weight_dca_voice_movement", 1)
|
||||
w_target = weights.get("weight_target_range", 1)
|
||||
w_melodic = weights.get("weight_melodic", 1)
|
||||
w_contrary = weights.get("weight_contrary_motion", 0)
|
||||
w_hamiltonian = weights.get("weight_dca_hamiltonian", 1)
|
||||
w_dca = weights.get("weight_dca_voice_movement", 1)
|
||||
w_target = weights.get("weight_target_range", 1)
|
||||
|
||||
influence["melodic"] += scores.get("melodic_threshold", 0) * w_melodic
|
||||
for step in self.steps:
|
||||
norm = step.normalized_scores
|
||||
|
||||
influence["melodic"] += (norm.get("melodic_threshold") or 0) * w_melodic
|
||||
influence["contrary_motion"] += (
|
||||
scores.get("contrary_motion", 0) * w_contrary
|
||||
)
|
||||
norm.get("contrary_motion") or 0
|
||||
) * w_contrary
|
||||
influence["dca_hamiltonian"] += (
|
||||
scores.get("dca_hamiltonian", 0) * w_hamiltonian
|
||||
)
|
||||
norm.get("dca_hamiltonian") or 0
|
||||
) * w_hamiltonian
|
||||
influence["dca_voice_movement"] += (
|
||||
scores.get("dca_voice_movement", 0) * w_dca
|
||||
)
|
||||
influence["target_range"] += scores.get("target_range", 0) * w_target
|
||||
norm.get("dca_voice_movement") or 0
|
||||
) * w_dca
|
||||
influence["target_range"] += (norm.get("target_range") or 0) * w_target
|
||||
|
||||
return influence
|
||||
|
|
|
|||
|
|
@ -5,7 +5,7 @@ PathFinder - finds paths through voice leading graphs.
|
|||
|
||||
from __future__ import annotations
|
||||
import networkx as nx
|
||||
from random import choices, seed
|
||||
from random import choices
|
||||
from typing import Callable
|
||||
|
||||
from .chord import Chord
|
||||
|
|
@ -54,29 +54,11 @@ class PathFinder:
|
|||
if not out_edges:
|
||||
break
|
||||
|
||||
# Derive state from last step (or initialize fresh for step 0)
|
||||
if path_obj.steps:
|
||||
last_step = path_obj.steps[-1]
|
||||
sustain_counts = last_step.sustain_counts_after
|
||||
last_visited_counts = last_step.last_visited_counts_after
|
||||
else:
|
||||
# First step - derive from path object's current state
|
||||
sustain_counts = tuple(0 for _ in range(len(path_obj._voice_map)))
|
||||
last_visited_counts = {node: 0 for node in set(self.graph.nodes())}
|
||||
# Build candidates using Path's state and factor methods
|
||||
candidates = path_obj.get_candidates(out_edges, path_obj.output_chords)
|
||||
|
||||
# Build candidates with raw scores
|
||||
candidates = self._build_candidates(
|
||||
out_edges,
|
||||
path_obj.output_chords,
|
||||
weights_config,
|
||||
sustain_counts,
|
||||
path_obj.graph_chords,
|
||||
path_obj._cumulative_trans,
|
||||
last_visited_counts,
|
||||
)
|
||||
|
||||
# Compute weights from raw scores
|
||||
self._compute_weights(candidates, weights_config)
|
||||
# Compute weights using Path's method
|
||||
path_obj.compute_weights(candidates, weights_config)
|
||||
|
||||
# Filter out candidates with zero weight
|
||||
valid_candidates = [c for c in candidates if c.weight > 0]
|
||||
|
|
@ -100,158 +82,6 @@ class PathFinder:
|
|||
|
||||
return path_obj
|
||||
|
||||
def _build_candidates(
|
||||
self,
|
||||
out_edges: list,
|
||||
path: list["Chord"],
|
||||
config: dict,
|
||||
sustain_counts: tuple[int, ...] | None,
|
||||
graph_path: list["Chord"] | None,
|
||||
cumulative_trans: "Pitch | None",
|
||||
last_visited_counts: dict | None,
|
||||
) -> list[PathStep]:
|
||||
"""Build hypothetical path steps with raw factor scores."""
|
||||
if not out_edges:
|
||||
return []
|
||||
|
||||
if not path:
|
||||
return []
|
||||
|
||||
source_chord = path[-1]
|
||||
candidates = []
|
||||
for i, edge in enumerate(out_edges):
|
||||
source_node = edge[0]
|
||||
destination_node = edge[1]
|
||||
edge_data = edge[2]
|
||||
|
||||
trans = edge_data.get("transposition")
|
||||
movement = edge_data.get("movements", {})
|
||||
|
||||
# Transpose destination node
|
||||
if trans is not None and cumulative_trans is not None:
|
||||
transposed = destination_node.transpose(
|
||||
cumulative_trans.transpose(trans)
|
||||
)
|
||||
else:
|
||||
transposed = destination_node
|
||||
|
||||
# Apply voice map
|
||||
voice_map = list(range(len(source_chord.pitches)))
|
||||
new_voice_map = [None] * len(voice_map)
|
||||
for src_idx, dest_idx in movement.items():
|
||||
new_voice_map[dest_idx] = voice_map[src_idx]
|
||||
|
||||
reordered_pitches = tuple(
|
||||
transposed.pitches[new_voice_map[i]] for i in range(len(new_voice_map))
|
||||
)
|
||||
destination_chord = Chord(reordered_pitches, destination_node.dims)
|
||||
|
||||
# All factors - always compute verbatim
|
||||
direct_tuning = self._factor_direct_tuning(edge_data, config)
|
||||
voice_crossing = self._factor_voice_crossing(edge_data, config)
|
||||
melodic = self._factor_melodic_threshold(edge_data, config)
|
||||
contrary = self._factor_contrary_motion(edge_data, config)
|
||||
hamiltonian = self._factor_dca_hamiltonian(
|
||||
edge, last_visited_counts, config
|
||||
)
|
||||
dca_voice = self._factor_dca_voice_movement(
|
||||
edge, path, sustain_counts, config, cumulative_trans
|
||||
)
|
||||
target = self._factor_target_range(edge, path, config, cumulative_trans)
|
||||
|
||||
scores = {
|
||||
"direct_tuning": direct_tuning,
|
||||
"voice_crossing": voice_crossing,
|
||||
"melodic_threshold": melodic,
|
||||
"contrary_motion": contrary,
|
||||
"dca_hamiltonian": hamiltonian,
|
||||
"dca_voice_movement": dca_voice,
|
||||
"target_range": target,
|
||||
}
|
||||
|
||||
step = PathStep(
|
||||
source_node=source_node,
|
||||
destination_node=destination_node,
|
||||
source_chord=source_chord,
|
||||
destination_chord=destination_chord,
|
||||
transposition=trans,
|
||||
movements=movement,
|
||||
scores=scores,
|
||||
)
|
||||
candidates.append(step)
|
||||
|
||||
return candidates
|
||||
|
||||
def _compute_weights(
|
||||
self,
|
||||
candidates: list[PathStep],
|
||||
config: dict,
|
||||
) -> list[float]:
|
||||
"""Compute weights from raw scores for all candidates.
|
||||
|
||||
Returns a list of weights, and updates each step's weight field.
|
||||
"""
|
||||
if not candidates:
|
||||
return []
|
||||
|
||||
# Collect raw values for normalization
|
||||
melodic_values = [c.scores.get("melodic_threshold", 0) for c in candidates]
|
||||
contrary_values = [c.scores.get("contrary_motion", 0) for c in candidates]
|
||||
hamiltonian_values = [c.scores.get("dca_hamiltonian", 0) for c in candidates]
|
||||
dca_values = [c.scores.get("dca_voice_movement", 0) for c in candidates]
|
||||
target_values = [c.scores.get("target_range", 0) for c in candidates]
|
||||
|
||||
# Helper function for sum normalization
|
||||
def sum_normalize(values: list) -> list | None:
|
||||
"""Normalize values to sum to 1. Returns None if no discrimination."""
|
||||
total = sum(values)
|
||||
if total == 0 or len(set(values)) <= 1:
|
||||
return None
|
||||
return [v / total for v in values]
|
||||
|
||||
# Sum normalize each factor
|
||||
melodic_norm = sum_normalize(melodic_values)
|
||||
contrary_norm = sum_normalize(contrary_values)
|
||||
hamiltonian_norm = sum_normalize(hamiltonian_values)
|
||||
dca_norm = sum_normalize(dca_values)
|
||||
target_norm = sum_normalize(target_values)
|
||||
|
||||
# Calculate weights for each candidate
|
||||
weights = []
|
||||
for i, step in enumerate(candidates):
|
||||
scores = step.scores
|
||||
w = 1.0
|
||||
|
||||
# Hard factors (multiplicative - eliminates if 0)
|
||||
w *= scores.get("direct_tuning", 0)
|
||||
if w == 0:
|
||||
step.weight = 0.0
|
||||
weights.append(0.0)
|
||||
continue
|
||||
|
||||
w *= scores.get("voice_crossing", 0)
|
||||
if w == 0:
|
||||
step.weight = 0.0
|
||||
weights.append(0.0)
|
||||
continue
|
||||
|
||||
# Soft factors (sum normalized, then weighted)
|
||||
if melodic_norm:
|
||||
w += melodic_norm[i] * config.get("weight_melodic", 1)
|
||||
if contrary_norm:
|
||||
w += contrary_norm[i] * config.get("weight_contrary_motion", 0)
|
||||
if hamiltonian_norm:
|
||||
w += hamiltonian_norm[i] * config.get("weight_dca_hamiltonian", 1)
|
||||
if dca_norm:
|
||||
w += dca_norm[i] * config.get("weight_dca_voice_movement", 1)
|
||||
if target_norm:
|
||||
w += target_norm[i] * config.get("weight_target_range", 1)
|
||||
|
||||
step.weight = w
|
||||
weights.append(w)
|
||||
|
||||
return weights
|
||||
|
||||
def _initialize_chords(self, start_chord: "Chord | None") -> tuple:
|
||||
"""Initialize chord sequence."""
|
||||
if start_chord is not None:
|
||||
|
|
@ -271,9 +101,10 @@ class PathFinder:
|
|||
if len(out_edges) == 0:
|
||||
continue
|
||||
|
||||
candidates = self._build_candidates(
|
||||
out_edges, [chord], weights_config, None, None, None, None
|
||||
)
|
||||
path = Path(chord, weights_config)
|
||||
path.init_state(set(self.graph.nodes()), len(chord.pitches), chord)
|
||||
candidates = path.get_candidates(out_edges, [chord])
|
||||
path.compute_weights(candidates, weights_config)
|
||||
nonzero = sum(1 for c in candidates if c.weight > 0)
|
||||
|
||||
if nonzero > 0:
|
||||
|
|
@ -297,222 +128,6 @@ class PathFinder:
|
|||
"target_range_octaves": 2.0,
|
||||
}
|
||||
|
||||
def _factor_melodic_threshold(self, edge_data: dict, config: dict) -> float:
|
||||
"""Returns continuous score based on melodic threshold.
|
||||
|
||||
- cents == 0: score = 1.0 (no movement is always ideal)
|
||||
- Below min (0 < cents < min): score = (cents / min)^3
|
||||
- Within range (min <= cents <= max): score = 1.0
|
||||
- Above max (cents > max): score = ((1200 - cents) / (1200 - max))^3
|
||||
|
||||
Returns product of all voice scores.
|
||||
"""
|
||||
melodic_min = config.get("melodic_threshold_min", 0)
|
||||
melodic_max = config.get("melodic_threshold_max", float("inf"))
|
||||
|
||||
cent_diffs = edge_data.get("cent_diffs", [])
|
||||
|
||||
if not cent_diffs:
|
||||
return 1.0
|
||||
|
||||
product = 1.0
|
||||
for cents in cent_diffs:
|
||||
abs_cents = abs(cents)
|
||||
if abs_cents == 0:
|
||||
score = 1.0
|
||||
elif abs_cents < melodic_min:
|
||||
score = (abs_cents / melodic_min) ** 3
|
||||
elif abs_cents > melodic_max:
|
||||
score = ((1200 - abs_cents) / (1200 - melodic_max)) ** 3
|
||||
else:
|
||||
score = 1.0
|
||||
product *= score
|
||||
|
||||
return product
|
||||
|
||||
def _factor_direct_tuning(self, edge_data: dict, config: dict) -> float:
|
||||
"""Returns 1.0 if directly tunable (or disabled), 0.0 otherwise."""
|
||||
# Check weight - if 0, return 1.0 (neutral)
|
||||
if config.get("weight_direct_tuning", 1) == 0:
|
||||
return 1.0
|
||||
|
||||
if config.get("direct_tuning", True):
|
||||
if edge_data.get("is_directly_tunable", False):
|
||||
return 1.0
|
||||
return 0.0
|
||||
return 1.0 # not configured, neutral
|
||||
|
||||
def _factor_voice_crossing(self, edge_data: dict, config: dict) -> float:
|
||||
"""Returns 1.0 if no voice crossing (or allowed), 0.0 if crossing and not allowed."""
|
||||
if config.get("voice_crossing_allowed", False):
|
||||
return 1.0
|
||||
|
||||
if edge_data.get("voice_crossing", False):
|
||||
return 0.0
|
||||
return 1.0
|
||||
|
||||
def _factor_contrary_motion(self, edge_data: dict, config: dict) -> float:
|
||||
"""Returns factor based on contrary motion.
|
||||
|
||||
Contrary motion: half of moving voices go one direction, half go opposite.
|
||||
Weighted by closeness to ideal half split.
|
||||
factor = 1.0 - (distance_from_half / half)
|
||||
"""
|
||||
if config.get("weight_contrary_motion", 0) == 0:
|
||||
return 1.0
|
||||
|
||||
cent_diffs = edge_data.get("cent_diffs", [])
|
||||
|
||||
num_up = sum(1 for d in cent_diffs if d > 0)
|
||||
num_down = sum(1 for d in cent_diffs if d < 0)
|
||||
num_moving = num_up + num_down
|
||||
|
||||
if num_moving < 2:
|
||||
return 0.0 # Need at least 2 moving voices for contrary motion
|
||||
|
||||
ideal_up = num_moving / 2
|
||||
distance = abs(num_up - ideal_up)
|
||||
return max(0.0, 1.0 - (distance / ideal_up))
|
||||
|
||||
def _factor_dca_hamiltonian(
|
||||
self, edge: tuple, last_visited_counts: dict | None, config: dict
|
||||
) -> float:
|
||||
"""Returns score based on how long since node was last visited.
|
||||
|
||||
DCA Hamiltonian: longer since visited = higher score.
|
||||
"""
|
||||
if config.get("weight_dca_hamiltonian", 1) == 0:
|
||||
return 1.0
|
||||
|
||||
if last_visited_counts is None:
|
||||
return 0.0
|
||||
|
||||
destination = edge[1]
|
||||
visit_count = last_visited_counts.get(destination, 0)
|
||||
|
||||
# Return the visit count - higher is better (more steps since last visit)
|
||||
return float(visit_count)
|
||||
|
||||
def _factor_dca_voice_movement(
|
||||
self,
|
||||
edge: tuple,
|
||||
path: list,
|
||||
sustain_counts: tuple[int, ...] | None,
|
||||
config: dict,
|
||||
cumulative_trans: "Pitch | None",
|
||||
) -> float:
|
||||
"""Returns probability that voices will change.
|
||||
|
||||
DCA = Dissonant Counterpoint Algorithm
|
||||
Probability = (sum of sustain_counts for changing voices) / (sum of ALL sustain_counts)
|
||||
|
||||
Higher probability = more likely to choose edge where long-staying voices change.
|
||||
"""
|
||||
if config.get("weight_dca_voice_movement", 1) == 0:
|
||||
return 1.0
|
||||
|
||||
if sustain_counts is None or len(path) == 0:
|
||||
return 1.0
|
||||
|
||||
if cumulative_trans is None:
|
||||
return 1.0
|
||||
|
||||
num_voices = len(sustain_counts)
|
||||
if num_voices == 0:
|
||||
return 1.0
|
||||
|
||||
current_chord = path[-1]
|
||||
edge_data = edge[2]
|
||||
next_graph_node = edge[1]
|
||||
trans = edge_data.get("transposition")
|
||||
if trans is not None:
|
||||
candidate_transposed = next_graph_node.transpose(
|
||||
cumulative_trans.transpose(trans)
|
||||
)
|
||||
else:
|
||||
candidate_transposed = next_graph_node.transpose(cumulative_trans)
|
||||
|
||||
current_cents = [p.to_cents() for p in current_chord.pitches]
|
||||
candidate_cents = [p.to_cents() for p in candidate_transposed.pitches]
|
||||
|
||||
sum_changing = 0
|
||||
sum_all = sum(sustain_counts)
|
||||
|
||||
if sum_all == 0:
|
||||
return 1.0
|
||||
|
||||
for voice_idx in range(num_voices):
|
||||
if current_cents[voice_idx] != candidate_cents[voice_idx]:
|
||||
sum_changing += sustain_counts[voice_idx]
|
||||
|
||||
return sum_changing / sum_all
|
||||
|
||||
def _factor_target_range(
|
||||
self,
|
||||
edge: tuple,
|
||||
path: list,
|
||||
config: dict,
|
||||
cumulative_trans: "Pitch | None",
|
||||
) -> float:
|
||||
"""Returns factor based on movement toward target.
|
||||
|
||||
Target progresses based on position in path.
|
||||
Uses average cents of current chord for accurate targeting.
|
||||
Factor > 1.0 if moving toward target, < 1.0 if moving away.
|
||||
"""
|
||||
if config.get("weight_target_range", 1) == 0:
|
||||
return 1.0
|
||||
|
||||
if not config.get("target_range", False):
|
||||
return 1.0
|
||||
|
||||
if len(path) == 0 or cumulative_trans is None:
|
||||
return 1.0
|
||||
|
||||
target_octaves = config.get("target_range_octaves", 2.0)
|
||||
max_path = config.get("max_path", 50)
|
||||
target_cents = target_octaves * 1200
|
||||
|
||||
start_avg_cents = sum(p.to_cents() for p in path[0].pitches) / len(
|
||||
path[0].pitches
|
||||
)
|
||||
progress = len(path) / max_path
|
||||
current_target = start_avg_cents + (progress * target_cents)
|
||||
|
||||
current_chord = path[-1]
|
||||
current_avg_cents = sum(p.to_cents() for p in current_chord.pitches) / len(
|
||||
current_chord.pitches
|
||||
)
|
||||
|
||||
edge_data = edge[2]
|
||||
next_graph_node = edge[1]
|
||||
edge_trans = edge_data.get("transposition")
|
||||
if edge_trans is not None:
|
||||
candidate_transposed = next_graph_node.transpose(
|
||||
cumulative_trans.transpose(edge_trans)
|
||||
)
|
||||
else:
|
||||
candidate_transposed = next_graph_node.transpose(cumulative_trans)
|
||||
candidate_avg_cents = sum(
|
||||
p.to_cents() for p in candidate_transposed.pitches
|
||||
) / len(candidate_transposed.pitches)
|
||||
|
||||
if current_target <= 0:
|
||||
return 1.0
|
||||
|
||||
dist_before = abs(current_avg_cents - current_target)
|
||||
dist_after = abs(candidate_avg_cents - current_target)
|
||||
|
||||
if dist_before == 0:
|
||||
return 1.0
|
||||
|
||||
if dist_after < dist_before:
|
||||
return 1.0 + (dist_before - dist_after) / dist_before
|
||||
elif dist_after > dist_before:
|
||||
return max(0.1, 1.0 - (dist_after - dist_before) / dist_before)
|
||||
else:
|
||||
return 1.0
|
||||
|
||||
def is_hamiltonian(self, path: list["Chord"]) -> bool:
|
||||
"""Check if a path is Hamiltonian (visits all nodes exactly once)."""
|
||||
return len(path) == len(self.graph.nodes()) and len(set(path)) == len(path)
|
||||
|
|
|
|||
Loading…
Reference in a new issue