#!/usr/bin/env python """ PathFinder - finds paths through voice leading graphs. """ from __future__ import annotations import networkx as nx from random import choices, seed from typing import Iterator class PathFinder: """Finds paths through voice leading graphs.""" def __init__(self, graph: nx.MultiDiGraph): self.graph = graph def find_stochastic_path( self, start_chord: "Chord | None" = None, max_length: int = 100, weights_config: dict | None = None, ) -> tuple[list["Chord"], list["Chord"]]: """Find a stochastic path through the graph. Returns: Tuple of (path, graph_path) where: - path: list of output Chord objects (transposed) - graph_path: list of original graph Chord objects (untransposed) """ if weights_config is None: weights_config = self._default_weights_config() chord = self._initialize_chords(start_chord) if not chord or chord[0] is None or len(self.graph.nodes()) == 0: return [], [] original_chord = chord[0] graph_node = original_chord output_chord = original_chord path = [output_chord] last_graph_nodes = (graph_node,) graph_path = [graph_node] # Track how long since each node was last visited (for DCA Hamiltonian) node_visit_counts = {node: 0 for node in self.graph.nodes()} # Mark start node as just visited node_visit_counts[graph_node] = 0 from .pitch import Pitch dims = output_chord.dims cumulative_trans = Pitch(tuple(0 for _ in range(len(dims))), dims) num_voices = len(output_chord.pitches) voice_map = list(range(num_voices)) voice_stay_count = [0] * num_voices for _ in range(max_length): # Increment all node visit counts for node in node_visit_counts: node_visit_counts[node] += 1 out_edges = list(self.graph.out_edges(graph_node, data=True)) if not out_edges: break weights = self._calculate_edge_weights( out_edges, path, last_graph_nodes, weights_config, tuple(voice_stay_count), graph_path, cumulative_trans, node_visit_counts, ) edge = choices(out_edges, weights=weights)[0] next_graph_node = edge[1] trans = edge[2].get("transposition") movement = edge[2].get("movements", {}) new_voice_map = [None] * num_voices for src_idx, dest_idx in movement.items(): new_voice_map[dest_idx] = voice_map[src_idx] voice_map = new_voice_map if trans is not None: cumulative_trans = cumulative_trans.transpose(trans) transposed = next_graph_node.transpose(cumulative_trans) reordered_pitches = tuple( transposed.pitches[voice_map[i]] for i in range(num_voices) ) from .chord import Chord output_chord = Chord(reordered_pitches, dims) for voice_idx in range(num_voices): curr_cents = path[-1].pitches[voice_idx].to_cents() next_cents = output_chord.pitches[voice_idx].to_cents() if curr_cents == next_cents: voice_stay_count[voice_idx] += 1 else: voice_stay_count[voice_idx] = 0 graph_node = next_graph_node graph_path.append(graph_node) # Reset visit count for visited node if next_graph_node in node_visit_counts: node_visit_counts[next_graph_node] = 0 path.append(output_chord) last_graph_nodes = last_graph_nodes + (graph_node,) if len(last_graph_nodes) > 2: last_graph_nodes = last_graph_nodes[-2:] return path, graph_path def _initialize_chords(self, start_chord: "Chord | None") -> tuple: """Initialize chord sequence.""" if start_chord is not None: return (start_chord,) nodes = list(self.graph.nodes()) if nodes: import random random.shuffle(nodes) weights_config = self._default_weights_config() weights_config["voice_crossing_allowed"] = False for chord in nodes[:50]: out_edges = list(self.graph.out_edges(chord, data=True)) if len(out_edges) == 0: continue weights = self._calculate_edge_weights( out_edges, [chord], (chord,), weights_config, None ) nonzero = sum(1 for w in weights if w > 0) if nonzero > 0: return (chord,) return (nodes[0],) return (None,) def _default_weights_config(self) -> dict: """Default weights configuration.""" return { "contrary_motion": True, "direct_tuning": True, "voice_crossing_allowed": False, "melodic_threshold_min": 0, "melodic_threshold_max": 500, "hamiltonian": True, "dca": 2.0, "target_range": False, "target_range_octaves": 2.0, } def _calculate_edge_weights( self, out_edges: list, path: list["Chord"], last_chords: tuple["Chord", ...], config: dict, voice_stay_count: tuple[int, ...] | None = None, graph_path: list["Chord"] | None = None, cumulative_trans: "Pitch | None" = None, node_visit_counts: dict | None = None, ) -> list[float]: """Calculate weights for edges based on configuration. Uses hybrid approach: - Hard factors (direct tuning, voice crossing): multiplication (eliminate if factor fails) - Soft factors: sum normalized per factor, then weighted sum """ if not out_edges: return [] # First pass: collect raw factor values for all edges melodic_values = [] contrary_values = [] hamiltonian_values = [] dca_values = [] target_values = [] for edge in out_edges: edge_data = edge[2] # Hard factors first (to filter invalid edges) direct_tuning = self._factor_direct_tuning(edge_data, config) voice_crossing = self._factor_voice_crossing(edge_data, config) # Skip if hard factors eliminate this edge if direct_tuning == 0 or voice_crossing == 0: melodic_values.append(0) contrary_values.append(0) hamiltonian_values.append(0) dca_values.append(0) target_values.append(0) continue # Soft factors melodic_values.append(self._factor_melodic_threshold(edge_data, config)) contrary_values.append(self._factor_contrary_motion(edge_data, config)) hamiltonian_values.append( self._factor_dca_hamiltonian(edge, node_visit_counts, config) ) dca_values.append( self._factor_dca_voice_movement( edge, path, voice_stay_count, config, cumulative_trans ) ) target_values.append( self._factor_target_range(edge, path, config, cumulative_trans) ) # Helper function for sum normalization def sum_normalize(values: list) -> list | None: """Normalize values to sum to 1. Returns None if no discrimination.""" total = sum(values) if total == 0 or len(set(values)) <= 1: return None # no discrimination return [v / total for v in values] # Sum normalize each factor melodic_norm = sum_normalize(melodic_values) contrary_norm = sum_normalize(contrary_values) hamiltonian_norm = sum_normalize(hamiltonian_values) dca_norm = sum_normalize(dca_values) target_norm = sum_normalize(target_values) # Second pass: calculate final weights weights = [] for i, edge in enumerate(out_edges): w = 1.0 # base weight edge_data = edge[2] # Hard factors w *= self._factor_direct_tuning(edge_data, config) if w == 0: weights.append(0) continue w *= self._factor_voice_crossing(edge_data, config) if w == 0: weights.append(0) continue # Soft factors (sum normalized, then weighted) if melodic_norm: w += melodic_norm[i] * config.get("weight_melodic", 1) if contrary_norm: w += contrary_norm[i] * config.get("weight_contrary_motion", 0) if hamiltonian_norm: w += hamiltonian_norm[i] * config.get("weight_dca_hamiltonian", 1) if dca_norm: w += dca_norm[i] * config.get("weight_dca_voice_movement", 1) if target_norm: w += target_norm[i] * config.get("weight_target_range", 1) weights.append(w) return weights def _factor_melodic_threshold(self, edge_data: dict, config: dict) -> float: """Returns 1.0 if all voice movements are within melodic threshold, 0.0 otherwise.""" # Check weight - if 0, return 1.0 (neutral) if config.get("weight_melodic", 1) == 0: return 1.0 melodic_min = config.get("melodic_threshold_min", 0) melodic_max = config.get("melodic_threshold_max", float("inf")) cent_diffs = edge_data.get("cent_diffs", []) if melodic_min is not None or melodic_max is not None: for cents in cent_diffs: if melodic_min is not None and cents < melodic_min: return 0.0 if melodic_max is not None and cents > melodic_max: return 0.0 return 1.0 def _factor_direct_tuning(self, edge_data: dict, config: dict) -> float: """Returns 1.0 if directly tunable (or disabled), 0.0 otherwise.""" # Check weight - if 0, return 1.0 (neutral) if config.get("weight_direct_tuning", 1) == 0: return 1.0 if config.get("direct_tuning", True): if edge_data.get("is_directly_tunable", False): return 1.0 return 0.0 return 1.0 # not configured, neutral def _factor_voice_crossing(self, edge_data: dict, config: dict) -> float: """Returns 1.0 if no voice crossing (or allowed), 0.0 if crossing and not allowed.""" if config.get("voice_crossing_allowed", False): return 1.0 if edge_data.get("voice_crossing", False): return 0.0 return 1.0 def _factor_contrary_motion(self, edge_data: dict, config: dict) -> float: """Returns factor based on contrary motion. Contrary motion: half of moving voices go one direction, half go opposite. Weighted by closeness to ideal half split. factor = 1.0 - (distance_from_half / half) """ if config.get("weight_contrary_motion", 0) == 0: return 1.0 cent_diffs = edge_data.get("cent_diffs", []) num_up = sum(1 for d in cent_diffs if d > 0) num_down = sum(1 for d in cent_diffs if d < 0) num_moving = num_up + num_down if num_moving < 2: return 0.0 # Need at least 2 moving voices for contrary motion ideal_up = num_moving / 2 distance = abs(num_up - ideal_up) return max(0.0, 1.0 - (distance / ideal_up)) def _factor_dca_hamiltonian( self, edge: tuple, node_visit_counts: dict | None, config: dict ) -> float: """Returns probability based on how long since node was last visited. DCA Hamiltonian: longer since visited = higher probability. Similar to DCA voice movement but for graph nodes. """ if config.get("weight_dca_hamiltonian", 1) == 0: return 1.0 if node_visit_counts is None: return 1.0 destination = edge[1] if destination not in node_visit_counts: return 1.0 visit_count = node_visit_counts[destination] total_counts = sum(node_visit_counts.values()) if total_counts == 0: return 1.0 return visit_count / total_counts def _factor_dca_voice_movement( self, edge: tuple, path: list, voice_stay_count: tuple[int, ...] | None, config: dict, cumulative_trans: "Pitch | None", ) -> float: """Returns probability that voices will change. DCA = Dissonant Counterpoint Algorithm Probability = (sum of stay_counts for changing voices) / (sum of ALL stay_counts) Higher probability = more likely to choose edge where long-staying voices change. """ if config.get("weight_dca_voice_movement", 1) == 0: return 1.0 if voice_stay_count is None or len(path) == 0: return 1.0 if cumulative_trans is None: return 1.0 num_voices = len(voice_stay_count) if num_voices == 0: return 1.0 current_chord = path[-1] edge_data = edge[2] next_graph_node = edge[1] trans = edge_data.get("transposition") if trans is not None: candidate_transposed = next_graph_node.transpose( cumulative_trans.transpose(trans) ) else: candidate_transposed = next_graph_node.transpose(cumulative_trans) current_cents = [p.to_cents() for p in current_chord.pitches] candidate_cents = [p.to_cents() for p in candidate_transposed.pitches] sum_changing = 0 sum_all = sum(voice_stay_count) if sum_all == 0: return 1.0 for voice_idx in range(num_voices): if current_cents[voice_idx] != candidate_cents[voice_idx]: sum_changing += voice_stay_count[voice_idx] return sum_changing / sum_all def _factor_target_range( self, edge: tuple, path: list, config: dict, cumulative_trans: "Pitch | None", ) -> float: """Returns factor based on movement toward target. Target progresses based on position in path. Uses average cents of current chord for accurate targeting. Factor > 1.0 if moving toward target, < 1.0 if moving away. """ if config.get("weight_target_range", 1) == 0: return 1.0 if not config.get("target_range", False): return 1.0 if len(path) == 0 or cumulative_trans is None: return 1.0 target_octaves = config.get("target_range_octaves", 2.0) max_path = config.get("max_path", 50) target_cents = target_octaves * 1200 start_avg_cents = sum(p.to_cents() for p in path[0].pitches) / len( path[0].pitches ) progress = len(path) / max_path current_target = start_avg_cents + (progress * target_cents) current_chord = path[-1] current_avg_cents = sum(p.to_cents() for p in current_chord.pitches) / len( current_chord.pitches ) edge_data = edge[2] next_graph_node = edge[1] edge_trans = edge_data.get("transposition") if edge_trans is not None: candidate_transposed = next_graph_node.transpose( cumulative_trans.transpose(edge_trans) ) else: candidate_transposed = next_graph_node.transpose(cumulative_trans) candidate_avg_cents = sum( p.to_cents() for p in candidate_transposed.pitches ) / len(candidate_transposed.pitches) if current_target <= 0: return 1.0 dist_before = abs(current_avg_cents - current_target) dist_after = abs(candidate_avg_cents - current_target) if dist_before == 0: return 1.0 if dist_after < dist_before: return 1.0 + (dist_before - dist_after) / dist_before elif dist_after > dist_before: return max(0.1, 1.0 - (dist_after - dist_before) / dist_before) else: return 1.0 def is_hamiltonian(self, path: list["Chord"]) -> bool: """Check if a path is Hamiltonian (visits all nodes exactly once).""" return len(path) == len(self.graph.nodes()) and len(set(path)) == len(path)