#!/usr/bin/env python """ Path and PathStep classes for storing path state from PathFinder. """ from __future__ import annotations from dataclasses import dataclass, field from typing import Any import math from .pitch import Pitch from .chord import Chord @dataclass class PathStep: """Stores data for a single step (edge) in the path.""" source_node: Chord destination_node: Chord source_chord: Chord destination_chord: Chord transposition: Pitch | None = None movements: dict[int, int] = field(default_factory=dict) scores: dict[str, float] = field(default_factory=dict) normalized_scores: dict[str, float | None] = field(default_factory=dict) weight: float = 0.0 # computed later by compute_weights last_visited_counts_before: dict | None = None last_visited_counts_after: dict | None = None sustain_counts_before: tuple[int, ...] | None = None sustain_counts_after: tuple[int, ...] | None = None new_cumulative_trans: Pitch | None = None new_voice_map: list[int] = field(default_factory=list) edge_data: dict = field(default_factory=dict) class Path: """Stores the complete state of a generated path.""" def __init__( self, initial_chord: Chord | None, weights_config: dict[str, Any] | None = None ): self.initial_chord = initial_chord self.steps: list[PathStep] = [] self.weights_config = weights_config if weights_config is not None else {} # State needed for step computation self._voice_map: list[int] = [] # which voice is at each position self._cumulative_trans: Pitch | None = None # cumulative transposition self._graph_nodes: set = set() # all graph nodes for visit tracking self._num_voices: int = 0 # number of voices def init_state( self, graph_nodes: set, num_voices: int, initial_chord: Chord ) -> None: """Initialize state after graph is known.""" self._graph_nodes = graph_nodes self._num_voices = num_voices self._voice_map = list(range(num_voices)) # voice i at position i dims = initial_chord.dims self._cumulative_trans = Pitch(tuple(0 for _ in range(len(dims))), dims) def get_candidates( self, out_edges: list, path_chords: list[Chord] ) -> list[PathStep]: """Generate candidates from graph edges using current state. Applies current cumulative_trans and voice_map to create destination_chord. Computes all factor scores. """ if not out_edges or not path_chords: return [] source_chord = path_chords[-1] candidates = [] for edge in out_edges: source_node = edge[0] destination_node = edge[1] edge_data = edge[2] trans = edge_data.get("transposition") movement = edge_data.get("movements", {}) # Compute new state after this candidate if trans is not None and self._cumulative_trans is not None: new_cumulative_trans = self._cumulative_trans.transpose(trans) else: new_cumulative_trans = self._cumulative_trans new_voice_map = [None] * len(self._voice_map) for src_idx, dest_idx in movement.items(): new_voice_map[dest_idx] = self._voice_map[src_idx] # Transpose destination node using current cumulative_trans if trans is not None and self._cumulative_trans is not None: transposed = destination_node.transpose( self._cumulative_trans.transpose(trans) ) else: transposed = destination_node # Apply voice map (reorder pitches) reordered_pitches = tuple( transposed.pitches[new_voice_map[i]] for i in range(len(new_voice_map)) ) destination_chord = Chord(reordered_pitches, destination_node.dims) # Compute all factor scores last_visited_before = self._get_last_visited_counts() sustain_before = self._get_sustain_counts() scores = { "direct_tuning": self._factor_direct_tuning( edge_data, self.weights_config ), "voice_crossing": self._factor_voice_crossing( edge_data, self.weights_config ), "melodic_threshold": self._factor_melodic_threshold( edge_data, self.weights_config ), "contrary_motion": self._factor_contrary_motion( edge_data, self.weights_config ), "dca_hamiltonian": self._factor_dca_hamiltonian( destination_node, last_visited_before, self.weights_config ), "dca_voice_movement": self._factor_dca_voice_movement( source_chord, destination_chord, sustain_before, self.weights_config, ), "target_register": self._factor_target_register( path_chords, destination_chord, self.weights_config, ), } # Compute AFTER state for this candidate last_visited_after = dict(last_visited_before) for node in last_visited_after: last_visited_after[node] += 1 last_visited_after[destination_node] = 0 sustain_after = list(sustain_before) for voice_idx in range(len(sustain_after)): curr_cents = source_chord.pitches[voice_idx].to_cents() next_cents = destination_chord.pitches[voice_idx].to_cents() if curr_cents == next_cents: sustain_after[voice_idx] += 1 else: sustain_after[voice_idx] = 0 step = PathStep( source_node=source_node, destination_node=destination_node, source_chord=source_chord, destination_chord=destination_chord, transposition=trans, movements=movement, scores=scores, last_visited_counts_before=last_visited_before, last_visited_counts_after=last_visited_after, sustain_counts_before=sustain_before, sustain_counts_after=tuple(sustain_after), new_cumulative_trans=new_cumulative_trans, new_voice_map=new_voice_map, edge_data=edge_data, ) candidates.append(step) return candidates def compute_weights(self, candidates: list[PathStep], config: dict) -> list[float]: """Compute weights from raw scores for all candidates. Updates each step's weight field. """ if not candidates: return [] melodic_values = [c.scores.get("melodic_threshold", 0) for c in candidates] contrary_values = [c.scores.get("contrary_motion", 0) for c in candidates] hamiltonian_values = [c.scores.get("dca_hamiltonian", 0) for c in candidates] dca_values = [c.scores.get("dca_voice_movement", 0) for c in candidates] target_values = [c.scores.get("target_register", 0) for c in candidates] def sum_normalize(values: list) -> list | None: """Normalize values to sum to 1. Returns None if no discrimination.""" total = sum(values) if total == 0 or len(set(values)) <= 1: return None return [v / total for v in values] melodic_norm = sum_normalize(melodic_values) contrary_norm = sum_normalize(contrary_values) hamiltonian_norm = sum_normalize(hamiltonian_values) dca_norm = sum_normalize(dca_values) target_norm = sum_normalize(target_values) weights = [] for i, step in enumerate(candidates): scores = step.scores w = 1.0 w *= scores.get("direct_tuning", 0) if w == 0: step.weight = 0.0 weights.append(0.0) continue w *= scores.get("voice_crossing", 0) if w == 0: step.weight = 0.0 weights.append(0.0) continue if melodic_norm: w *= melodic_norm[i] * config.get("weight_melodic", 1) if contrary_norm: w *= contrary_norm[i] * config.get("weight_contrary_motion", 0) if hamiltonian_norm: w *= hamiltonian_norm[i] * config.get("weight_dca_hamiltonian", 1) if dca_norm: w *= dca_norm[i] * config.get("weight_dca_voice_movement", 1) if target_norm: w *= target_norm[i] * config.get("weight_target_register", 1) step.weight = w**16 weights.append(w) # Store normalized scores (0-1 range) for influence calculation step.normalized_scores = { "melodic_threshold": melodic_norm[i] if melodic_norm else None, "contrary_motion": contrary_norm[i] if contrary_norm else None, "dca_hamiltonian": hamiltonian_norm[i] if hamiltonian_norm else None, "dca_voice_movement": dca_norm[i] if dca_norm else None, "target_register": target_norm[i] if target_norm else None, } return weights # ========== Factor Methods ========== def _factor_melodic_threshold(self, edge_data: dict, config: dict) -> float: """Returns continuous score based on melodic threshold.""" melodic_min = config.get("melodic_threshold_min", 0) melodic_max = config.get("melodic_threshold_max", float("inf")) cent_diffs = edge_data.get("cent_diffs", []) if not cent_diffs: return 1.0 product = 1.0 for cents in cent_diffs: abs_cents = abs(cents) if abs_cents == 0: score = 1.0 elif abs_cents < melodic_min: score = (abs_cents / melodic_min) ** 3 elif abs_cents > melodic_max: score = ((1200 - abs_cents) / (1200 - melodic_max)) ** 3 else: score = 1.0 product *= score return product def _factor_direct_tuning(self, edge_data: dict, config: dict) -> float: """Returns 1.0 if directly tunable (or disabled), 0.0 otherwise.""" if config.get("weight_direct_tuning", 1) == 0: return 1.0 if config.get("direct_tuning", True): if edge_data.get("is_directly_tunable", False): return 1.0 return 0.0 return 1.0 def _factor_voice_crossing(self, edge_data: dict, config: dict) -> float: """Returns 1.0 if no voice crossing (or allowed), 0.0 if crossing.""" if config.get("voice_crossing_allowed", False): return 1.0 if edge_data.get("voice_crossing", False): return 0.0 return 1.0 def _factor_contrary_motion(self, edge_data: dict, config: dict) -> float: """Returns factor based on contrary motion.""" if config.get("weight_contrary_motion", 0) == 0: return 1.0 cent_diffs = edge_data.get("cent_diffs", []) num_up = sum(1 for d in cent_diffs if d > 0) num_down = sum(1 for d in cent_diffs if d < 0) num_moving = num_up + num_down if num_moving < 2: return 0.0 ideal_up = num_moving / 2 distance = abs(num_up - ideal_up) return max(0.0, 1.0 - (distance / ideal_up)) def _factor_dca_hamiltonian( self, destination_node, last_visited_counts: dict, config: dict ) -> float: """Returns score based on how long since node was last visited.""" if config.get("weight_dca_hamiltonian", 1) == 0: return 1.0 if last_visited_counts is None: return 0.0 visit_count = last_visited_counts.get(destination_node, 0) return float(visit_count) def _factor_dca_voice_movement( self, source_chord: Chord, destination_chord: Chord, sustain_counts: tuple[int, ...], config: dict, ) -> float: """Returns probability that voices will change.""" if config.get("weight_dca_voice_movement", 1) == 0: return 1.0 if sustain_counts is None: return 1.0 num_voices = len(sustain_counts) if num_voices == 0: return 1.0 current_cents = [p.to_cents() for p in source_chord.pitches] candidate_cents = [p.to_cents() for p in destination_chord.pitches] sum_changing = 0 sum_all = sum(sustain_counts) for voice_idx in range(num_voices): if current_cents[voice_idx] != candidate_cents[voice_idx]: sum_changing += sustain_counts[voice_idx] return sum_changing def _factor_target_register( self, path_chords: list[Chord], destination_chord: Chord, config: dict, ) -> float: """Returns factor based on movement toward target.""" if config.get("weight_target_register", 1) == 0: return 1.0 if not config.get("target_register", False): return 1.0 if len(path_chords) == 0: return 1.0 target_octaves = config.get("target_register_octaves", 2.0) max_path = config.get("max_path", 50) target_cents = target_octaves * 1200 power = config.get("target_register_power", 1.0) start_avg_cents = sum(p.to_cents() for p in path_chords[0].pitches) / len( path_chords[0].pitches ) progress = len(path_chords) / max_path progress_curve = progress**power oscillations = config.get("target_register_oscillations", 0) amplitude = config.get("target_register_amplitude", 0.25) if oscillations > 0: sine_wave = math.sin(progress_curve * 2 * math.pi * oscillations) modulation = amplitude * sine_wave progress_curve = max(0, progress_curve + modulation) current_target = start_avg_cents + (progress_curve * target_cents) current_chord = path_chords[-1] current_avg_cents = sum(p.to_cents() for p in current_chord.pitches) / len( current_chord.pitches ) candidate_avg_cents = sum( p.to_cents() for p in destination_chord.pitches ) / len(destination_chord.pitches) dist_before = abs(current_avg_cents - current_target) dist_after = abs(candidate_avg_cents - current_target) if dist_before == 0: return 1.0 if dist_after < dist_before: return 1.0 + (dist_before - dist_after) / dist_before elif dist_after > dist_before: return max(0.1, 1.0 - (dist_after - dist_before) / dist_before) else: return 1.0 # ========== State Methods ========== def _get_last_visited_counts(self) -> dict: """Get last visited counts from the last step, or initialize fresh.""" if self.steps: last_step = self.steps[-1] if last_step.last_visited_counts_after is not None: return dict(last_step.last_visited_counts_after) return {node: 0 for node in self._graph_nodes} def _get_sustain_counts(self) -> tuple: """Get sustain counts from the last step, or initialize fresh.""" if self.steps: last_step = self.steps[-1] if last_step.sustain_counts_after is not None: return last_step.sustain_counts_after return tuple(0 for _ in range(self._num_voices)) def step(self, step: PathStep) -> PathStep: """Commit a chosen candidate to the path. All state was computed in get_candidates(). This just applies the stored new state and commits the step. """ # Apply stored new state self._cumulative_trans = step.new_cumulative_trans self._voice_map = step.new_voice_map # Commit self.steps.append(step) return step @property def graph_chords(self) -> list[Chord]: """Get list of destination graph nodes.""" return [self.initial_chord] + [step.destination_node for step in self.steps] @property def output_chords(self) -> list[Chord]: """Get list of destination chords (transposed).""" return [self.initial_chord] + [step.destination_chord for step in self.steps] def __len__(self) -> int: """Total number of chords in path.""" return len(self.steps) + 1 def __iter__(self): """Iterate over output chords.""" return iter(self.output_chords) def get_influence(self, weights: dict[str, Any]) -> dict[str, float]: """Compute weighted score contribution per factor for chosen candidates. Uses normalized scores (0-1 range) for consistent influence across factors. """ influence = { "melodic": 0.0, "contrary_motion": 0.0, "dca_hamiltonian": 0.0, "dca_voice_movement": 0.0, "target_register": 0.0, } w_melodic = weights.get("weight_melodic", 1) w_contrary = weights.get("weight_contrary_motion", 0) w_hamiltonian = weights.get("weight_dca_hamiltonian", 1) w_dca = weights.get("weight_dca_voice_movement", 1) w_target = weights.get("weight_target_register", 1) for step in self.steps: norm = step.normalized_scores influence["melodic"] += (norm.get("melodic_threshold") or 0) * w_melodic influence["contrary_motion"] += ( norm.get("contrary_motion") or 0 ) * w_contrary influence["dca_hamiltonian"] += ( norm.get("dca_hamiltonian") or 0 ) * w_hamiltonian influence["dca_voice_movement"] += ( norm.get("dca_voice_movement") or 0 ) * w_dca influence["target_register"] += ( norm.get("target_register") or 0 ) * w_target return influence