#!/usr/bin/env python """ PathFinder - finds paths through voice leading graphs. """ from __future__ import annotations import networkx as nx from random import choices, seed from typing import Iterator class PathFinder: """Finds paths through voice leading graphs.""" def __init__(self, graph: nx.MultiDiGraph): self.graph = graph def find_stochastic_path( self, start_chord: "Chord | None" = None, max_length: int = 100, weights_config: dict | None = None, ) -> tuple[list["Chord"], list["Chord"]]: """Find a stochastic path through the graph. Returns: Tuple of (path, graph_path) where: - path: list of output Chord objects (transposed) - graph_path: list of original graph Chord objects (untransposed) """ if weights_config is None: weights_config = self._default_weights_config() chord = self._initialize_chords(start_chord) if not chord or chord[0] is None or len(self.graph.nodes()) == 0: return [], [] original_chord = chord[0] graph_node = original_chord output_chord = original_chord path = [output_chord] last_graph_nodes = (graph_node,) graph_path = [graph_node] from .pitch import Pitch dims = output_chord.dims cumulative_trans = Pitch(tuple(0 for _ in range(len(dims))), dims) num_voices = len(output_chord.pitches) voice_map = list(range(num_voices)) voice_stay_count = [0] * num_voices for _ in range(max_length): out_edges = list(self.graph.out_edges(graph_node, data=True)) if not out_edges: break weights = self._calculate_edge_weights( out_edges, path, last_graph_nodes, weights_config, tuple(voice_stay_count), graph_path, cumulative_trans, ) edge = choices(out_edges, weights=weights)[0] next_graph_node = edge[1] trans = edge[2].get("transposition") movement = edge[2].get("movements", {}) new_voice_map = [None] * num_voices for src_idx, dest_idx in movement.items(): new_voice_map[dest_idx] = voice_map[src_idx] voice_map = new_voice_map if trans is not None: cumulative_trans = cumulative_trans.transpose(trans) transposed = next_graph_node.transpose(cumulative_trans) reordered_pitches = tuple( transposed.pitches[voice_map[i]] for i in range(num_voices) ) from .chord import Chord output_chord = Chord(reordered_pitches, dims) for voice_idx in range(num_voices): curr_cents = path[-1].pitches[voice_idx].to_cents() next_cents = output_chord.pitches[voice_idx].to_cents() if curr_cents == next_cents: voice_stay_count[voice_idx] += 1 else: voice_stay_count[voice_idx] = 0 graph_node = next_graph_node graph_path.append(graph_node) path.append(output_chord) last_graph_nodes = last_graph_nodes + (graph_node,) if len(last_graph_nodes) > 2: last_graph_nodes = last_graph_nodes[-2:] return path, graph_path def _initialize_chords(self, start_chord: "Chord | None") -> tuple: """Initialize chord sequence.""" if start_chord is not None: return (start_chord,) nodes = list(self.graph.nodes()) if nodes: import random random.shuffle(nodes) weights_config = self._default_weights_config() weights_config["voice_crossing_allowed"] = False for chord in nodes[:50]: out_edges = list(self.graph.out_edges(chord, data=True)) if len(out_edges) == 0: continue weights = self._calculate_edge_weights( out_edges, [chord], (chord,), weights_config, None ) nonzero = sum(1 for w in weights if w > 0) if nonzero > 0: return (chord,) return (nodes[0],) return (None,) def _default_weights_config(self) -> dict: """Default weights configuration.""" return { "contrary_motion": True, "direct_tuning": True, "voice_crossing_allowed": False, "melodic_threshold_min": 0, "melodic_threshold_max": 500, "hamiltonian": True, "dca": 2.0, "target_range": False, "target_range_octaves": 2.0, } def _calculate_edge_weights( self, out_edges: list, path: list["Chord"], last_chords: tuple["Chord", ...], config: dict, voice_stay_count: tuple[int, ...] | None = None, graph_path: list["Chord"] | None = None, cumulative_trans: "Pitch | None" = None, ) -> list[float]: """Calculate weights for edges based on configuration. Uses hybrid approach: - Hard factors (direct tuning, voice crossing): multiplication (eliminate if factor fails) - Soft factors (melodic, contrary, hamiltonian, dca, target range): weighted sum """ weights = [] for edge_idx, edge in enumerate(out_edges): w = 1.0 # base weight edge_data = edge[2] # Hard factors (multiplication - eliminate edge if factor = 0) # Direct tuning direct_tuning_factor = self._factor_direct_tuning(edge_data, config) w *= direct_tuning_factor if w == 0: weights.append(0) continue # Voice crossing voice_crossing_factor = self._factor_voice_crossing(edge_data, config) w *= voice_crossing_factor if w == 0: weights.append(0) continue # Soft factors (weighted sum) w += self._factor_melodic_threshold(edge_data, config) * config.get( "weight_melodic", 1 ) w += self._factor_contrary_motion(edge_data, config) * config.get( "weight_contrary_motion", 0 ) w += self._factor_hamiltonian(edge, graph_path, config) * config.get( "weight_hamiltonian", 1 ) w += self._factor_dca( edge, path, voice_stay_count, config, cumulative_trans ) * config.get("weight_dca", 1) w += self._factor_target_range( edge, path, config, cumulative_trans ) * config.get("weight_target_range", 1) weights.append(w) return weights def _factor_melodic_threshold(self, edge_data: dict, config: dict) -> float: """Returns 1.0 if all voice movements are within melodic threshold, 0.0 otherwise.""" # Check weight - if 0, return 1.0 (neutral) if config.get("weight_melodic", 1) == 0: return 1.0 melodic_min = config.get("melodic_threshold_min", 0) melodic_max = config.get("melodic_threshold_max", float("inf")) cent_diffs = edge_data.get("cent_diffs", []) if melodic_min is not None or melodic_max is not None: for cents in cent_diffs: if melodic_min is not None and cents < melodic_min: return 0.0 if melodic_max is not None and cents > melodic_max: return 0.0 return 1.0 def _factor_direct_tuning(self, edge_data: dict, config: dict) -> float: """Returns 1.0 if directly tunable (or disabled), 0.0 otherwise.""" # Check weight - if 0, return 1.0 (neutral) if config.get("weight_direct_tuning", 1) == 0: return 1.0 if config.get("direct_tuning", True): if edge_data.get("is_directly_tunable", False): return 1.0 return 0.0 return 1.0 # not configured, neutral def _factor_voice_crossing(self, edge_data: dict, config: dict) -> float: """Returns 1.0 if no voice crossing (or allowed), 0.0 if crossing and not allowed.""" if config.get("voice_crossing_allowed", False): return 1.0 if edge_data.get("voice_crossing", False): return 0.0 return 1.0 def _factor_contrary_motion(self, edge_data: dict, config: dict) -> float: """Returns factor based on contrary motion. Contrary motion: half of moving voices go one direction, half go opposite. Weighted by closeness to ideal half split. factor = 1.0 - (distance_from_half / half) """ if config.get("weight_contrary_motion", 0) == 0: return 1.0 cent_diffs = edge_data.get("cent_diffs", []) num_up = sum(1 for d in cent_diffs if d > 0) num_down = sum(1 for d in cent_diffs if d < 0) num_moving = num_up + num_down if num_moving < 2: return 0.0 # Need at least 2 moving voices for contrary motion ideal_up = num_moving / 2 distance = abs(num_up - ideal_up) return max(0.0, 1.0 - (distance / ideal_up)) def _factor_hamiltonian( self, edge: tuple, graph_path: list | None, config: dict ) -> float: """Returns 1.0 if destination not visited, lower if already visited.""" # Check weight - if 0, return 1.0 (neutral) if config.get("weight_hamiltonian", 1) == 0: return 1.0 if not config.get("hamiltonian", False): return 1.0 destination = edge[1] if graph_path and destination in graph_path: return 0.1 # penalize revisiting return 1.0 def _factor_dca( self, edge: tuple, path: list, voice_stay_count: tuple[int, ...] | None, config: dict, cumulative_trans: "Pitch | None", ) -> float: """Returns probability that voices will change. DCA = Dissonant Counterpoint Algorithm Probability = (sum of stay_counts for changing voices) / (sum of ALL stay_counts) Higher probability = more likely to choose edge where long-staying voices change. """ if config.get("weight_dca", 1) == 0: return 1.0 if voice_stay_count is None or len(path) == 0: return 1.0 if cumulative_trans is None: return 1.0 num_voices = len(voice_stay_count) if num_voices == 0: return 1.0 current_chord = path[-1] edge_data = edge[2] next_graph_node = edge[1] trans = edge_data.get("transposition") if trans is not None: candidate_transposed = next_graph_node.transpose( cumulative_trans.transpose(trans) ) else: candidate_transposed = next_graph_node.transpose(cumulative_trans) current_cents = [p.to_cents() for p in current_chord.pitches] candidate_cents = [p.to_cents() for p in candidate_transposed.pitches] sum_changing = 0 sum_all = sum(voice_stay_count) if sum_all == 0: return 1.0 for voice_idx in range(num_voices): if current_cents[voice_idx] != candidate_cents[voice_idx]: sum_changing += voice_stay_count[voice_idx] return sum_changing / sum_all def _factor_target_range( self, edge: tuple, path: list, config: dict, cumulative_trans: "Pitch | None", ) -> float: """Returns factor based on movement toward target. Target progresses based on position in path. Uses average cents of current chord for accurate targeting. Factor > 1.0 if moving toward target, < 1.0 if moving away. """ if config.get("weight_target_range", 1) == 0: return 1.0 if not config.get("target_range", False): return 1.0 if len(path) == 0 or cumulative_trans is None: return 1.0 target_octaves = config.get("target_range_octaves", 2.0) max_path = config.get("max_path", 50) target_cents = target_octaves * 1200 start_avg_cents = sum(p.to_cents() for p in path[0].pitches) / len( path[0].pitches ) progress = len(path) / max_path current_target = start_avg_cents + (progress * target_cents) current_chord = path[-1] current_avg_cents = sum(p.to_cents() for p in current_chord.pitches) / len( current_chord.pitches ) edge_data = edge[2] next_graph_node = edge[1] edge_trans = edge_data.get("transposition") if edge_trans is not None: candidate_transposed = next_graph_node.transpose( cumulative_trans.transpose(edge_trans) ) else: candidate_transposed = next_graph_node.transpose(cumulative_trans) candidate_avg_cents = sum( p.to_cents() for p in candidate_transposed.pitches ) / len(candidate_transposed.pitches) if current_target <= 0: return 1.0 dist_before = abs(current_avg_cents - current_target) dist_after = abs(candidate_avg_cents - current_target) if dist_before == 0: return 1.0 if dist_after < dist_before: return 1.0 + (dist_before - dist_after) / dist_before elif dist_after > dist_before: return max(0.1, 1.0 - (dist_after - dist_before) / dist_before) else: return 1.0 def is_hamiltonian(self, path: list["Chord"]) -> bool: """Check if a path is Hamiltonian (visits all nodes exactly once).""" return len(path) == len(self.graph.nodes()) and len(set(path)) == len(path)