compact_sets/src/graph.py

380 lines
12 KiB
Python
Raw Normal View History

#!/usr/bin/env python
"""
PathFinder - finds paths through voice leading graphs.
"""
from __future__ import annotations
import networkx as nx
from random import choices, seed
from typing import Iterator
class PathFinder:
"""Finds paths through voice leading graphs."""
def __init__(self, graph: nx.MultiDiGraph):
self.graph = graph
def find_stochastic_path(
self,
start_chord: "Chord | None" = None,
max_length: int = 100,
weights_config: dict | None = None,
) -> list["Chord"]:
"""Find a stochastic path through the graph."""
if weights_config is None:
weights_config = self._default_weights_config()
chord = self._initialize_chords(start_chord)
if not chord or chord[0] is None or len(self.graph.nodes()) == 0:
return []
original_chord = chord[0]
graph_node = original_chord
output_chord = original_chord
path = [output_chord]
last_graph_nodes = (graph_node,)
graph_path = [graph_node]
from .pitch import Pitch
dims = output_chord.dims
cumulative_trans = Pitch(tuple(0 for _ in range(len(dims))), dims)
num_voices = len(output_chord.pitches)
voice_map = list(range(num_voices))
voice_stay_count = [0] * num_voices
for _ in range(max_length):
out_edges = list(self.graph.out_edges(graph_node, data=True))
if not out_edges:
break
weights = self._calculate_edge_weights(
out_edges,
path,
last_graph_nodes,
weights_config,
tuple(voice_stay_count),
graph_path,
cumulative_trans,
)
edge = choices(out_edges, weights=weights)[0]
next_graph_node = edge[1]
trans = edge[2].get("transposition")
movement = edge[2].get("movements", {})
new_voice_map = [None] * num_voices
for src_idx, dest_idx in movement.items():
new_voice_map[dest_idx] = voice_map[src_idx]
voice_map = new_voice_map
if trans is not None:
cumulative_trans = cumulative_trans.transpose(trans)
transposed = next_graph_node.transpose(cumulative_trans)
reordered_pitches = tuple(
transposed.pitches[voice_map[i]] for i in range(num_voices)
)
from .chord import Chord
output_chord = Chord(reordered_pitches, dims)
for voice_idx in range(num_voices):
curr_cents = path[-1].pitches[voice_idx].to_cents()
next_cents = output_chord.pitches[voice_idx].to_cents()
if curr_cents == next_cents:
voice_stay_count[voice_idx] += 1
else:
voice_stay_count[voice_idx] = 0
graph_node = next_graph_node
graph_path.append(graph_node)
path.append(output_chord)
last_graph_nodes = last_graph_nodes + (graph_node,)
if len(last_graph_nodes) > 2:
last_graph_nodes = last_graph_nodes[-2:]
return path
def _initialize_chords(self, start_chord: "Chord | None") -> tuple:
"""Initialize chord sequence."""
if start_chord is not None:
return (start_chord,)
nodes = list(self.graph.nodes())
if nodes:
import random
random.shuffle(nodes)
weights_config = self._default_weights_config()
weights_config["voice_crossing_allowed"] = False
for chord in nodes[:50]:
out_edges = list(self.graph.out_edges(chord, data=True))
if len(out_edges) == 0:
continue
weights = self._calculate_edge_weights(
out_edges, [chord], (chord,), weights_config, None
)
nonzero = sum(1 for w in weights if w > 0)
if nonzero > 0:
return (chord,)
return (nodes[0],)
return (None,)
def _default_weights_config(self) -> dict:
"""Default weights configuration."""
return {
"contrary_motion": True,
"direct_tuning": True,
"voice_crossing_allowed": False,
"melodic_threshold_min": 0,
"melodic_threshold_max": 500,
"hamiltonian": True,
"dca": 2.0,
"target_range": False,
"target_range_octaves": 2.0,
}
def _calculate_edge_weights(
self,
out_edges: list,
path: list["Chord"],
last_chords: tuple["Chord", ...],
config: dict,
voice_stay_count: tuple[int, ...] | None = None,
graph_path: list["Chord"] | None = None,
cumulative_trans: "Pitch | None" = None,
) -> list[float]:
"""Calculate weights for edges based on configuration.
Uses hybrid approach:
- Hard factors (direct tuning, voice crossing): multiplication (eliminate if factor fails)
- Soft factors (melodic, contrary, hamiltonian, dca, target range): weighted sum
"""
weights = []
for edge_idx, edge in enumerate(out_edges):
w = 1.0 # base weight
edge_data = edge[2]
# Hard factors (multiplication - eliminate edge if factor = 0)
# Direct tuning
direct_tuning_factor = self._factor_direct_tuning(edge_data, config)
w *= direct_tuning_factor
if w == 0:
weights.append(0)
continue
# Voice crossing
voice_crossing_factor = self._factor_voice_crossing(edge_data, config)
w *= voice_crossing_factor
if w == 0:
weights.append(0)
continue
# Soft factors (weighted sum)
w += self._factor_melodic_threshold(edge_data, config) * config.get(
"weight_melodic", 1
)
w += self._factor_contrary_motion(edge_data, config) * config.get(
"weight_contrary_motion", 0
)
w += self._factor_hamiltonian(edge, graph_path, config) * config.get(
"weight_hamiltonian", 1
)
w += self._factor_dca(
edge, path, voice_stay_count, config, cumulative_trans
) * config.get("weight_dca", 1)
w += self._factor_target_range(
edge_data, path, config, cumulative_trans
) * config.get("weight_target_range", 1)
weights.append(w)
return weights
def _factor_melodic_threshold(self, edge_data: dict, config: dict) -> float:
"""Returns 1.0 if all voice movements are within melodic threshold, 0.0 otherwise."""
# Check weight - if 0, return 1.0 (neutral)
if config.get("weight_melodic", 1) == 0:
return 1.0
melodic_min = config.get("melodic_threshold_min", 0)
melodic_max = config.get("melodic_threshold_max", float("inf"))
cent_diffs = edge_data.get("cent_diffs", [])
if melodic_min is not None or melodic_max is not None:
for cents in cent_diffs:
if melodic_min is not None and cents < melodic_min:
return 0.0
if melodic_max is not None and cents > melodic_max:
return 0.0
return 1.0
def _factor_direct_tuning(self, edge_data: dict, config: dict) -> float:
"""Returns 1.0 if directly tunable (or disabled), 0.0 otherwise."""
# Check weight - if 0, return 1.0 (neutral)
if config.get("weight_direct_tuning", 1) == 0:
return 1.0
if config.get("direct_tuning", True):
if edge_data.get("is_directly_tunable", False):
return 1.0
return 0.0
return 1.0 # not configured, neutral
def _factor_voice_crossing(self, edge_data: dict, config: dict) -> float:
"""Returns 1.0 if no voice crossing (or allowed), 0.0 if crossing and not allowed."""
if config.get("voice_crossing_allowed", False):
return 1.0
if edge_data.get("voice_crossing", False):
return 0.0
return 1.0
def _factor_contrary_motion(self, edge_data: dict, config: dict) -> float:
"""Returns 1.0 if voices move in contrary motion, 0.0 otherwise."""
# Check weight - if 0, return 1.0 (neutral)
if config.get("weight_contrary_motion", 0) == 0:
return 1.0
if not config.get("contrary_motion", False):
return 1.0 # neutral if not configured
cent_diffs = edge_data.get("cent_diffs", [])
if len(cent_diffs) >= 3:
sorted_diffs = sorted(cent_diffs)
if sorted_diffs[0] < 0 and sorted_diffs[-1] > 0:
return 1.0
return 0.0
def _factor_hamiltonian(
self, edge: tuple, graph_path: list | None, config: dict
) -> float:
"""Returns 1.0 if destination not visited, lower if already visited."""
# Check weight - if 0, return 1.0 (neutral)
if config.get("weight_hamiltonian", 1) == 0:
return 1.0
if not config.get("hamiltonian", False):
return 1.0
destination = edge[1]
if graph_path and destination in graph_path:
return 0.1 # penalize revisiting
return 1.0
def _factor_dca(
self,
edge: tuple,
path: list,
voice_stay_count: tuple[int, ...] | None,
config: dict,
cumulative_trans: "Pitch | None",
) -> float:
"""Returns probability that voices will change.
DCA = Dissonant Counterpoint Algorithm
Probability = (sum of stay_counts for changing voices) / (sum of ALL stay_counts)
Higher probability = more likely to choose edge where long-staying voices change.
"""
if config.get("weight_dca", 1) == 0:
return 1.0
if voice_stay_count is None or len(path) == 0:
return 1.0
if cumulative_trans is None:
return 1.0
num_voices = len(voice_stay_count)
if num_voices == 0:
return 1.0
current_chord = path[-1]
edge_data = edge[2]
next_graph_node = edge[1]
trans = edge_data.get("transposition")
if trans is not None:
candidate_transposed = next_graph_node.transpose(
cumulative_trans.transpose(trans)
)
else:
candidate_transposed = next_graph_node.transpose(cumulative_trans)
current_cents = [p.to_cents() for p in current_chord.pitches]
candidate_cents = [p.to_cents() for p in candidate_transposed.pitches]
sum_changing = 0
sum_all = sum(voice_stay_count)
if sum_all == 0:
return 1.0
for voice_idx in range(num_voices):
if current_cents[voice_idx] != candidate_cents[voice_idx]:
sum_changing += voice_stay_count[voice_idx]
return sum_changing / sum_all
def _factor_target_range(
self,
edge_data: dict,
path: list,
config: dict,
cumulative_trans: "Pitch | None",
) -> float:
"""Returns 1.0 if at target, 0.0 if far from target.
Target progresses based on position in path.
"""
# Check weight - if 0, return 1.0 (neutral)
if config.get("weight_target_range", 1) == 0:
return 1.0
if not config.get("target_range", False):
return 1.0
if len(path) == 0 or cumulative_trans is None:
return 1.0
target_octaves = config.get("target_range_octaves", 2.0)
max_path = config.get("max_path", 50)
target_cents = target_octaves * 1200
progress = len(path) / max_path
current_target = progress * target_cents
edge_trans = edge_data.get("transposition")
new_cumulative = cumulative_trans.transpose(edge_trans)
new_cumulative_cents = new_cumulative.to_cents()
# Closeness: 1.0 if at target, 0.0 if far
if current_target <= 0:
return 1.0
distance = abs(new_cumulative_cents - current_target)
return 1.0 / (1.0 + distance)
def is_hamiltonian(self, path: list["Chord"]) -> bool:
"""Check if a path is Hamiltonian (visits all nodes exactly once)."""
return len(path) == len(self.graph.nodes()) and len(set(path)) == len(path)