#!/usr/bin/env python """ PathFinder - finds paths through voice leading graphs. """ from __future__ import annotations from dataclasses import dataclass import networkx as nx from random import choices, seed from typing import Callable, Iterator from .path import Path @dataclass class Candidate: """A candidate edge with raw factor scores.""" edge: tuple edge_index: int graph_node: "Chord" scores: dict[str, float] weight: float = 0.0 # computed later by _compute_weights class PathFinder: """Finds paths through voice leading graphs.""" def __init__(self, graph: nx.MultiDiGraph): self.graph = graph def find_stochastic_path( self, start_chord: "Chord | None" = None, max_length: int = 100, weights_config: dict | None = None, callback: Callable[[int, Path, dict], None] | None = None, interval: int = 1, ) -> Path: """Find a stochastic path through the graph. Returns: Path object containing output chords, graph chords, and metadata """ if weights_config is None: weights_config = self._default_weights_config() chord = self._initialize_chords(start_chord) if not chord or chord[0] is None or len(self.graph.nodes()) == 0: return Path(chord[0] if chord else None, weights_config) original_chord = chord[0] path_obj = Path(original_chord, weights_config) path_obj.init_state( set(self.graph.nodes()), len(original_chord.pitches), original_chord ) graph_node = original_chord step_num = 0 for _ in range(max_length): out_edges = list(self.graph.out_edges(graph_node, data=True)) if not out_edges: break # Derive state from last step (or initialize fresh for step 0) if path_obj.steps: last_step = path_obj.steps[-1] voice_stay_count = last_step.sustain_count_after node_visit_counts = last_step.last_visited_count_after else: # First step - derive from path object's current state voice_stay_count = tuple(0 for _ in range(len(path_obj._voice_map))) node_visit_counts = {node: 0 for node in set(self.graph.nodes())} # Build candidates with raw scores candidates = self._build_candidates( out_edges, path_obj.output_chords, weights_config, voice_stay_count, path_obj.graph_chords, path_obj._cumulative_trans, node_visit_counts, ) # Compute weights from raw scores self._compute_weights(candidates, weights_config) # Filter out candidates with zero weight valid_candidates = [c for c in candidates if c.weight > 0] if not valid_candidates: break # Select using weighted choice chosen = choices( valid_candidates, weights=[c.weight for c in valid_candidates] )[0] # Use path.step() to handle all voice-leading and state updates path_obj.step( edge=chosen.edge, candidates=candidates, chosen_scores=chosen.scores, ) graph_node = chosen.graph_node step_num += 1 # Invoke callback if configured if callback is not None and step_num % interval == 0: callback(step_num, path_obj, weights_config) return path_obj def _build_candidates( self, out_edges: list, path: list["Chord"], config: dict, voice_stay_count: tuple[int, ...] | None, graph_path: list["Chord"] | None, cumulative_trans: "Pitch | None", node_visit_counts: dict | None, ) -> list["Candidate"]: """Build candidates with raw factor scores only.""" if not out_edges: return [] candidates = [] for i, edge in enumerate(out_edges): edge_data = edge[2] # All factors - always compute verbatim direct_tuning = self._factor_direct_tuning(edge_data, config) voice_crossing = self._factor_voice_crossing(edge_data, config) melodic = self._factor_melodic_threshold(edge_data, config) contrary = self._factor_contrary_motion(edge_data, config) hamiltonian = self._factor_dca_hamiltonian(edge, node_visit_counts, config) dca_voice = self._factor_dca_voice_movement( edge, path, voice_stay_count, config, cumulative_trans ) target = self._factor_target_range(edge, path, config, cumulative_trans) scores = { "direct_tuning": direct_tuning, "voice_crossing": voice_crossing, "melodic_threshold": melodic, "contrary_motion": contrary, "dca_hamiltonian": hamiltonian, "dca_voice_movement": dca_voice, "target_range": target, } candidates.append(Candidate(edge, i, edge[1], scores, 0.0)) return candidates def _compute_weights( self, candidates: list["Candidate"], config: dict, ) -> list[float]: """Compute weights from raw scores for all candidates. Returns a list of weights, and updates each candidate's weight field. """ if not candidates: return [] # Collect raw values for normalization melodic_values = [c.scores.get("melodic_threshold", 0) for c in candidates] contrary_values = [c.scores.get("contrary_motion", 0) for c in candidates] hamiltonian_values = [c.scores.get("dca_hamiltonian", 0) for c in candidates] dca_values = [c.scores.get("dca_voice_movement", 0) for c in candidates] target_values = [c.scores.get("target_range", 0) for c in candidates] # Helper function for sum normalization def sum_normalize(values: list) -> list | None: """Normalize values to sum to 1. Returns None if no discrimination.""" total = sum(values) if total == 0 or len(set(values)) <= 1: return None return [v / total for v in values] # Sum normalize each factor melodic_norm = sum_normalize(melodic_values) contrary_norm = sum_normalize(contrary_values) hamiltonian_norm = sum_normalize(hamiltonian_values) dca_norm = sum_normalize(dca_values) target_norm = sum_normalize(target_values) # Calculate weights for each candidate weights = [] for i, candidate in enumerate(candidates): scores = candidate.scores w = 1.0 # Hard factors (multiplicative - eliminates if 0) w *= scores.get("direct_tuning", 0) if w == 0: candidate.weight = 0.0 weights.append(0.0) continue w *= scores.get("voice_crossing", 0) if w == 0: candidate.weight = 0.0 weights.append(0.0) continue # Soft factors (sum normalized, then weighted) if melodic_norm: w += melodic_norm[i] * config.get("weight_melodic", 1) if contrary_norm: w += contrary_norm[i] * config.get("weight_contrary_motion", 0) if hamiltonian_norm: w += hamiltonian_norm[i] * config.get("weight_dca_hamiltonian", 1) if dca_norm: w += dca_norm[i] * config.get("weight_dca_voice_movement", 1) if target_norm: w += target_norm[i] * config.get("weight_target_range", 1) candidate.weight = w weights.append(w) return weights def _initialize_chords(self, start_chord: "Chord | None") -> tuple: """Initialize chord sequence.""" if start_chord is not None: return (start_chord,) nodes = list(self.graph.nodes()) if nodes: import random random.shuffle(nodes) weights_config = self._default_weights_config() weights_config["voice_crossing_allowed"] = False for chord in nodes[:50]: out_edges = list(self.graph.out_edges(chord, data=True)) if len(out_edges) == 0: continue candidates = self._build_candidates( out_edges, [chord], weights_config, None, None, None, None ) nonzero = sum(1 for c in candidates if c.weight > 0) if nonzero > 0: return (chord,) return (nodes[0],) return (None,) def _default_weights_config(self) -> dict: """Default weights configuration.""" return { "contrary_motion": True, "direct_tuning": True, "voice_crossing_allowed": False, "melodic_threshold_min": 0, "melodic_threshold_max": 500, "hamiltonian": True, "dca": 2.0, "target_range": False, "target_range_octaves": 2.0, } def _factor_melodic_threshold(self, edge_data: dict, config: dict) -> float: """Returns continuous score based on melodic threshold. - cents == 0: score = 1.0 (no movement is always ideal) - Below min (0 < cents < min): score = (cents / min)^3 - Within range (min <= cents <= max): score = 1.0 - Above max (cents > max): score = ((1200 - cents) / (1200 - max))^3 Returns product of all voice scores. """ melodic_min = config.get("melodic_threshold_min", 0) melodic_max = config.get("melodic_threshold_max", float("inf")) cent_diffs = edge_data.get("cent_diffs", []) if not cent_diffs: return 1.0 product = 1.0 for cents in cent_diffs: abs_cents = abs(cents) if abs_cents == 0: score = 1.0 elif abs_cents < melodic_min: score = (abs_cents / melodic_min) ** 3 elif abs_cents > melodic_max: score = ((1200 - abs_cents) / (1200 - melodic_max)) ** 3 else: score = 1.0 product *= score return product def _factor_direct_tuning(self, edge_data: dict, config: dict) -> float: """Returns 1.0 if directly tunable (or disabled), 0.0 otherwise.""" # Check weight - if 0, return 1.0 (neutral) if config.get("weight_direct_tuning", 1) == 0: return 1.0 if config.get("direct_tuning", True): if edge_data.get("is_directly_tunable", False): return 1.0 return 0.0 return 1.0 # not configured, neutral def _factor_voice_crossing(self, edge_data: dict, config: dict) -> float: """Returns 1.0 if no voice crossing (or allowed), 0.0 if crossing and not allowed.""" if config.get("voice_crossing_allowed", False): return 1.0 if edge_data.get("voice_crossing", False): return 0.0 return 1.0 def _factor_contrary_motion(self, edge_data: dict, config: dict) -> float: """Returns factor based on contrary motion. Contrary motion: half of moving voices go one direction, half go opposite. Weighted by closeness to ideal half split. factor = 1.0 - (distance_from_half / half) """ if config.get("weight_contrary_motion", 0) == 0: return 1.0 cent_diffs = edge_data.get("cent_diffs", []) num_up = sum(1 for d in cent_diffs if d > 0) num_down = sum(1 for d in cent_diffs if d < 0) num_moving = num_up + num_down if num_moving < 2: return 0.0 # Need at least 2 moving voices for contrary motion ideal_up = num_moving / 2 distance = abs(num_up - ideal_up) return max(0.0, 1.0 - (distance / ideal_up)) def _factor_dca_hamiltonian( self, edge: tuple, node_visit_counts: dict | None, config: dict ) -> float: """Returns probability based on how long since node was last visited. DCA Hamiltonian: longer since visited = higher probability. Similar to DCA voice movement but for graph nodes. """ if config.get("weight_dca_hamiltonian", 1) == 0: return 1.0 if node_visit_counts is None: return 1.0 destination = edge[1] if destination not in node_visit_counts: return 1.0 visit_count = node_visit_counts[destination] max_count = max(node_visit_counts.values()) if node_visit_counts else 0 if max_count == 0: return 1.0 # Normalize by max squared - gives stronger discrimination return visit_count / (max_count**2) def _factor_dca_voice_movement( self, edge: tuple, path: list, voice_stay_count: tuple[int, ...] | None, config: dict, cumulative_trans: "Pitch | None", ) -> float: """Returns probability that voices will change. DCA = Dissonant Counterpoint Algorithm Probability = (sum of stay_counts for changing voices) / (sum of ALL stay_counts) Higher probability = more likely to choose edge where long-staying voices change. """ if config.get("weight_dca_voice_movement", 1) == 0: return 1.0 if voice_stay_count is None or len(path) == 0: return 1.0 if cumulative_trans is None: return 1.0 num_voices = len(voice_stay_count) if num_voices == 0: return 1.0 current_chord = path[-1] edge_data = edge[2] next_graph_node = edge[1] trans = edge_data.get("transposition") if trans is not None: candidate_transposed = next_graph_node.transpose( cumulative_trans.transpose(trans) ) else: candidate_transposed = next_graph_node.transpose(cumulative_trans) current_cents = [p.to_cents() for p in current_chord.pitches] candidate_cents = [p.to_cents() for p in candidate_transposed.pitches] sum_changing = 0 sum_all = sum(voice_stay_count) if sum_all == 0: return 1.0 for voice_idx in range(num_voices): if current_cents[voice_idx] != candidate_cents[voice_idx]: sum_changing += voice_stay_count[voice_idx] return sum_changing / sum_all def _factor_target_range( self, edge: tuple, path: list, config: dict, cumulative_trans: "Pitch | None", ) -> float: """Returns factor based on movement toward target. Target progresses based on position in path. Uses average cents of current chord for accurate targeting. Factor > 1.0 if moving toward target, < 1.0 if moving away. """ if config.get("weight_target_range", 1) == 0: return 1.0 if not config.get("target_range", False): return 1.0 if len(path) == 0 or cumulative_trans is None: return 1.0 target_octaves = config.get("target_range_octaves", 2.0) max_path = config.get("max_path", 50) target_cents = target_octaves * 1200 start_avg_cents = sum(p.to_cents() for p in path[0].pitches) / len( path[0].pitches ) progress = len(path) / max_path current_target = start_avg_cents + (progress * target_cents) current_chord = path[-1] current_avg_cents = sum(p.to_cents() for p in current_chord.pitches) / len( current_chord.pitches ) edge_data = edge[2] next_graph_node = edge[1] edge_trans = edge_data.get("transposition") if edge_trans is not None: candidate_transposed = next_graph_node.transpose( cumulative_trans.transpose(edge_trans) ) else: candidate_transposed = next_graph_node.transpose(cumulative_trans) candidate_avg_cents = sum( p.to_cents() for p in candidate_transposed.pitches ) / len(candidate_transposed.pitches) if current_target <= 0: return 1.0 dist_before = abs(current_avg_cents - current_target) dist_after = abs(candidate_avg_cents - current_target) if dist_before == 0: return 1.0 if dist_after < dist_before: return 1.0 + (dist_before - dist_after) / dist_before elif dist_after > dist_before: return max(0.1, 1.0 - (dist_after - dist_before) / dist_before) else: return 1.0 def is_hamiltonian(self, path: list["Chord"]) -> bool: """Check if a path is Hamiltonian (visits all nodes exactly once).""" return len(path) == len(self.graph.nodes()) and len(set(path)) == len(path)