diff --git a/.gitignore b/.gitignore index 4cbd318..0479f06 100644 --- a/.gitignore +++ b/.gitignore @@ -1,7 +1,11 @@ __pycache__/ *.pyc -output_*.json -output_*.txt .venv/ venv/ .pytest_cache/ +ruff_cache/ + +# Generated outputs +cache/ +output/ + diff --git a/compact_sets.py b/compact_sets.py index 8a5d0f6..f8e6c6a 100644 --- a/compact_sets.py +++ b/compact_sets.py @@ -1,1179 +1,5 @@ #!/usr/bin/env python -""" -Compact Sets: A rational theory of harmony - -Based on Michael Winter's theory of conjunct connected sets in harmonic space, -combining ideas from Tom Johnson, James Tenney, and Larry Polansky. - -Mathematical foundations: -- Harmonic space: multidimensional lattice where dimensions = prime factors -- Connected sets: chords forming a connected sublattice -- Voice leading graphs: edges based on symmetric difference + melodic thresholds -""" - -from __future__ import annotations -from fractions import Fraction -from itertools import combinations, permutations -from math import prod, log -from operator import add -from random import choices, seed -from typing import Iterator - -import networkx as nx - - -# ============================================================================ -# CONSTANTS -# ============================================================================ - -DIMS_8 = (2, 3, 5, 7, 11, 13, 17, 19) -DIMS_7 = (2, 3, 5, 7, 11, 13, 17) -DIMS_5 = (2, 3, 5, 7, 11) -DIMS_4 = (2, 3, 5, 7) - - -# ============================================================================ -# PITCH -# ============================================================================ - - -class Pitch: - """ - A point in harmonic space. - - Represented as an array of exponents on prime dimensions. - Example: (0, 1, 0, 0) represents 3/2 (perfect fifth) in CHS_7 - """ - - def __init__(self, hs_array: tuple[int, ...], dims: tuple[int, ...] | None = None): - """ - Initialize a pitch from a harmonic series array. - - Args: - hs_array: Tuple of exponents for each prime dimension - dims: Tuple of primes defining the harmonic space (defaults to DIMS_7) - """ - self.hs_array = hs_array - self.dims = dims if dims is not None else DIMS_7 - - def __hash__(self) -> int: - return hash(self.hs_array) - - def __eq__(self, other: object) -> bool: - if not isinstance(other, Pitch): - return NotImplemented - return self.hs_array == other.hs_array - - def __repr__(self) -> str: - return f"Pitch({self.hs_array})" - - def __iter__(self): - return iter(self.hs_array) - - def __len__(self) -> int: - return len(self.hs_array) - - def __getitem__(self, index: int) -> int: - return self.hs_array[index] - - def to_fraction(self) -> Fraction: - """Convert to frequency ratio (e.g., 3/2).""" - return Fraction( - prod(pow(self.dims[d], self.hs_array[d]) for d in range(len(self.dims))) - ) - - def to_cents(self) -> float: - """Convert to cents (relative to 1/1 = 0 cents).""" - fr = self.to_fraction() - return 1200 * log(float(fr), 2) - - def collapse(self) -> Pitch: - """ - Collapse pitch so frequency ratio is in [1, 2). - - This removes octave information, useful for pitch classes. - """ - collapsed = list(self.hs_array) - fr = self.to_fraction() - - if fr < 1: - while fr < 1: - fr *= 2 - collapsed[0] += 1 - elif fr >= 2: - while fr >= 2: - fr /= 2 - collapsed[0] -= 1 - - return Pitch(tuple(collapsed), self.dims) - - def project(self) -> Pitch: - """Project pitch to [1, 2) range - same as collapse.""" - return self.collapse() - - def transpose(self, trans: Pitch) -> Pitch: - """Transpose by another pitch (add exponents element-wise).""" - return Pitch(tuple(map(add, self.hs_array, trans.hs_array)), self.dims) - - def pitch_difference(self, other: Pitch) -> Pitch: - """Calculate the pitch difference (self - other).""" - return Pitch( - tuple(self.hs_array[d] - other.hs_array[d] for d in range(len(self.dims))), - self.dims, - ) - - -# ============================================================================ -# CHORD -# ============================================================================ - - -class Chord: - """ - A set of pitches forming a connected subgraph in harmonic space. - - A chord is a tuple of Pitches. Two chords are equivalent under - transposition if they have the same intervallic structure. - """ - - def __init__(self, pitches: tuple[Pitch, ...], dims: tuple[int, ...] | None = None): - """ - Initialize a chord from a tuple of pitches. - - Args: - pitches: Tuple of Pitch objects - dims: Harmonic space dimensions (defaults to DIMS_7) - """ - self.dims = dims if dims is not None else DIMS_7 - self._pitches = pitches - - def __hash__(self) -> int: - return hash(self._pitches) - - def __eq__(self, other: object) -> bool: - if not isinstance(other, Chord): - return NotImplemented - return self._pitches == other._pitches - - def __repr__(self) -> str: - return f"Chord({self._pitches})" - - def __iter__(self) -> Iterator[Pitch]: - return iter(self._pitches) - - def __len__(self) -> int: - return len(self._pitches) - - def __getitem__(self, index: int) -> Pitch: - return self._pitches[index] - - @property - def pitches(self) -> tuple[Pitch, ...]: - """Get the pitches as a tuple.""" - return self._pitches - - @property - def collapsed_pitches(self) -> set[Pitch]: - """Get all pitches collapsed to pitch class.""" - return set(p.collapse() for p in self._pitches) - - def is_connected(self) -> bool: - """ - Check if the chord forms a connected subgraph in harmonic space. - - A set is connected if every pitch can be reached from every other - by stepping through adjacent pitches (differing by ±1 in one dimension). - """ - if len(self._pitches) <= 1: - return True - - # Build adjacency through single steps - adj = {p: set() for p in self._pitches} - - for i, p1 in enumerate(self._pitches): - for p2 in self._pitches[i + 1 :]: - if self._is_adjacent(p1, p2): - adj[p1].add(p2) - adj[p2].add(p1) - - # BFS from first pitch - visited = {self._pitches[0]} - queue = [self._pitches[0]] - - while queue: - current = queue.pop(0) - for neighbor in adj[current]: - if neighbor not in visited: - visited.add(neighbor) - queue.append(neighbor) - - return len(visited) == len(self._pitches) - - def _is_adjacent(self, p1: Pitch, p2: Pitch) -> bool: - """Check if two pitches are adjacent (differ by ±1 in exactly one dimension). - - For collapsed harmonic space, skip dimension 0 (the 2/octave dimension). - """ - diff_count = 0 - # Start from dimension 1 (skip dimension 0 = octave in CHS) - for d in range(1, len(self.dims)): - diff = abs(p1[d] - p2[d]) - if diff > 1: - return False - if diff == 1: - diff_count += 1 - return diff_count == 1 - - def symmetric_difference_size(self, other: Chord) -> int: - """Calculate the size of symmetric difference between two chords.""" - set1 = set(p.collapse() for p in self._pitches) - set2 = set(p.collapse() for p in other._pitches) - return len(set1.symmetric_difference(set2)) - - def size_difference(self, other: Chord) -> int: - """Calculate the absolute difference in chord sizes.""" - return abs(len(self._pitches) - len(other._pitches)) - - def project_all(self) -> list[Pitch]: - """Project all pitches to [1, 2) range.""" - return [p.project() for p in self._pitches] - - def transpose(self, trans: Pitch) -> Chord: - """Transpose the entire chord.""" - return Chord(tuple(p.transpose(trans) for p in self._pitches), self.dims) - - def sorted_by_frequency(self) -> list[Pitch]: - """Sort pitches by frequency (low to high).""" - return sorted(self._pitches, key=lambda p: p.to_fraction()) - - -# ============================================================================ -# HARMONIC SPACE -# ============================================================================ - - -class HarmonicSpace: - """ - Harmonic space HS_l or collapsed harmonic space CHS_l. - - A multidimensional lattice where each dimension corresponds to a prime factor. - """ - - def __init__(self, dims: tuple[int, ...] = DIMS_7, collapsed: bool = True): - """ - Initialize harmonic space. - - Args: - dims: Tuple of primes defining the space (e.g., (2, 3, 5, 7)) - collapsed: If True, use collapsed harmonic space (CHS_l) - """ - self.dims = dims - self.collapsed = collapsed - - def __repr__(self) -> str: - suffix = " (collapsed)" if self.collapsed else "" - return f"HarmonicSpace({self.dims}{suffix})" - - def pitch(self, hs_array: tuple[int, ...]) -> Pitch: - """Create a Pitch in this space.""" - return Pitch(hs_array, self.dims) - - def chord(self, pitches: tuple[Pitch, ...]) -> Chord: - """Create a Chord in this space.""" - return Chord(pitches, self.dims) - - def root(self) -> Pitch: - """Get the root pitch (1/1).""" - return self.pitch(tuple(0 for _ in self.dims)) - - def _branch_from(self, vertex: tuple[int, ...]) -> set[tuple[int, ...]]: - """ - Get all vertices adjacent to the given vertex. - - For collapsed harmonic space, skip dimension 0 (the octave dimension). - """ - branches = set() - - # Skip dimension 0 (octave) in collapsed harmonic space - start_dim = 1 if self.collapsed else 0 - - for i in range(start_dim, len(self.dims)): - for delta in (-1, 1): - branch = list(vertex) - branch[i] += delta - branches.add(tuple(branch)) - - return branches - - def generate_connected_sets( - self, min_size: int, max_size: int, collapsed: bool = True - ) -> set[Chord]: - """ - Generate all unique connected sets of a given size. - - Args: - min_size: Minimum number of pitches in a chord - max_size: Maximum number of pitches in a chord - collapsed: If True, use CHS (skip dim 0 in branching). - If False (default), include dim 0 in branching. - - Returns: - Set of unique Chord objects - """ - root = tuple(0 for _ in self.dims) - - def branch_from(vertex): - """Get adjacent vertices. Skip dim 0 for CHS.""" - branches = set() - start_dim = 1 if collapsed else 0 - for i in range(start_dim, len(self.dims)): - for delta in (-1, 1): - branch = list(vertex) - branch[i] += delta - branches.add(tuple(branch)) - return branches - - def grow( - chord: tuple[tuple[int, ...], ...], - connected: set[tuple[int, ...]], - visited: set[tuple[int, ...]], - ) -> Iterator[tuple[tuple[int, ...], ...]]: - """Recursively grow connected sets.""" - # Yield if within size bounds - if min_size <= len(chord) <= max_size: - # If collapsed=True, project each pitch to [1,2) - if collapsed: - projected = [] - for arr in chord: - p = self.pitch(arr) - projected.append(p.project().hs_array) - yield tuple(projected) - else: - yield chord - - # Continue growing if not at max size - if len(chord) < max_size: - visited = set(visited) - for b in connected: - if b not in visited: - extended = chord + (b,) - new_connected = connected | branch_from(b) - visited.add(b) - yield from grow(extended, new_connected, visited) - - # Start generation from root - connected = branch_from(root) - visited = {root} - - results = set() - for chord_arrays in grow((root,), connected, visited): - pitches = tuple(self.pitch(arr) for arr in chord_arrays) - # Sort by frequency after projection (lowest to highest = bass to soprano) - sorted_pitches = tuple(sorted(pitches, key=lambda p: p.to_fraction())) - results.add(Chord(sorted_pitches, self.dims)) - - return results - - def build_voice_leading_graph( - self, - chords: set[Chord], - symdiff_min: int = 2, - symdiff_max: int = 2, - ) -> nx.MultiDiGraph: - """ - Build a voice leading graph from a set of chords. - - Args: - chords: Set of Chord objects - symdiff_min: Minimum symmetric difference between chords - symdiff_max: Maximum symmetric difference between chords - - Returns: - NetworkX MultiDiGraph - """ - symdiff_range = (symdiff_min, symdiff_max) - - graph = nx.MultiDiGraph() - - # Add all chords as nodes - for chord in chords: - graph.add_node(chord) - - # Add edges based on local morphological constraints - for c1, c2 in combinations(chords, 2): - edges = self._find_valid_edges(c1, c2, symdiff_range) - for edge_data in edges: - ( - trans, - weight, - movements, - cent_diffs, - voice_crossing, - is_directly_tunable, - ) = edge_data - graph.add_edge( - c1, - c2, - transposition=trans, - weight=weight, - movements=movements, - cent_diffs=cent_diffs, - voice_crossing=voice_crossing, - is_directly_tunable=is_directly_tunable, - ) - graph.add_edge( - c2, - c1, - transposition=self._invert_transposition(trans), - weight=weight, - movements=self._reverse_movements(movements), - cent_diffs=list( - reversed(cent_diffs) - ), # reverse for opposite direction - voice_crossing=voice_crossing, # same in reverse - is_directly_tunable=is_directly_tunable, - ) - - return graph - - def _reverse_movements(self, movements: dict) -> dict: - """Reverse the movement mappings (index to index).""" - reversed_movements = {} - for src_idx, dest_idx in movements.items(): - reversed_movements[dest_idx] = src_idx - return reversed_movements - - def _is_directly_tunable( - self, - c1_pitches: tuple[Pitch, ...], - c2_transposed_pitches: tuple[Pitch, ...], - movements: dict, - ) -> bool: - """ - Check if all changing pitches are adjacent (directly tunable) to a staying pitch. - - A changing pitch is directly tunable if it differs from a staying pitch - by exactly one prime dimension (±1 in one dimension, 0 in all others). - """ - # Find staying pitches (where movement is identity: i -> i) - staying_indices = [i for i in range(len(c1_pitches)) if movements.get(i) == i] - - if not staying_indices: - return False # No staying pitch to tune to - - # Find changing pitches - changing_indices = [ - i for i in range(len(c1_pitches)) if i not in staying_indices - ] - - if not changing_indices: - return True # No changing pitches = directly tunable - - # For each changing pitch, check if it's adjacent to any staying pitch - for ch_idx in changing_indices: - ch_pitch = c2_transposed_pitches[ch_idx] - is_adjacent_to_staying = False - - for st_idx in staying_indices: - st_pitch = c1_pitches[st_idx] - if self._is_adjacent_pitches(st_pitch, ch_pitch): - is_adjacent_to_staying = True - break - - if not is_adjacent_to_staying: - return False - - return True - - def _find_valid_edges( - self, - c1: Chord, - c2: Chord, - symdiff_range: tuple[int, int], - ) -> list[tuple[Pitch, float, dict, list[float], bool, bool]]: - """ - Find all valid edges between two chords. - - Tests all transpositions of c2 to find ones that satisfy - the symmetric difference constraint AND each changing pitch - is connected (adjacent) to a pitch in the previous chord. - - Returns: - List of (transposition, weight, movements, cent_diffs, voice_crossing, is_directly_tunable) tuples. - - movements: dict {src_idx: dest_idx} - - cent_diffs: list of cent differences per voice - - voice_crossing: True if voices cross - - is_directly_tunable: True if all changing pitches adjacent to staying pitch - """ - edges = [] - - # Get unique transpositions first (fast deduplication) - transpositions = { - p1.pitch_difference(p2) for p1 in c1.pitches for p2 in c2.pitches - } - - # Try each unique transposition - for trans in transpositions: - # Transpose c2 - c2_transposed = c2.transpose(trans) - - # Check symmetric difference - symdiff = self._calc_symdiff(c1, c2_transposed) - - if not (symdiff_range[0] <= symdiff <= symdiff_range[1]): - continue - - # CRITICAL: Each changing pitch must be connected to a pitch in c1 - voice_lead_ok = self._check_voice_leading_connectivity(c1, c2_transposed) - - if not voice_lead_ok: - continue - - # Build all valid movement maps (one per permutation of changing pitches) - movement_maps = self._build_movement_maps(c1.pitches, c2_transposed.pitches) - - # Create one edge per movement map with computed edge properties - for movements in movement_maps: - # Compute cent_diffs for each voice - cent_diffs = [] - for src_idx, dest_idx in movements.items(): - src_pitch = c1.pitches[src_idx] - dst_pitch = c2_transposed.pitches[dest_idx] - cents = abs(src_pitch.to_cents() - dst_pitch.to_cents()) - cent_diffs.append(cents) - - # Check voice_crossing: since source is sorted (bass-to-soprano), - # check if destination is also sorted after applying movement - # If any adjacent pair in destination is out of order, voices crossed - source = list(c1.pitches) - - # Rearrange destination pitches by movement map - destination = [None] * len(source) - for src_idx, dest_idx in movements.items(): - destination[dest_idx] = c2_transposed.pitches[src_idx] - - # Check if destination is still in ascending order (sorted) - # Since source is sorted ascending, if destination is not sorted, voices crossed - voice_crossing = False - for i in range(len(destination) - 1): - if destination[i] is None or destination[i + 1] is None: - continue - if destination[i].to_fraction() >= destination[i + 1].to_fraction(): - voice_crossing = True - break - - # Check is_directly_tunable: changing pitches are adjacent to staying pitch - is_directly_tunable = self._is_directly_tunable( - c1.pitches, c2_transposed.pitches, movements - ) - - edges.append( - ( - trans, - 1.0, - movements, - cent_diffs, - voice_crossing, - is_directly_tunable, - ) - ) - - return edges - - def _build_movement_maps( - self, c1_pitches: tuple[Pitch, ...], c2_transposed_pitches: tuple[Pitch, ...] - ) -> list[dict]: - """ - Build all valid movement maps for c1 -> c2_transposed. - - A movement map shows which position in c1 maps to which position in c2. - - Returns: - List of movement maps. Each map is {src_idx: dest_idx} - There may be multiple valid maps if multiple changing pitches can be permuted. - """ - # Find common pitches (same pitch class in both) - c1_collapsed = [p.collapse() for p in c1_pitches] - c2_collapsed = [p.collapse() for p in c2_transposed_pitches] - - common_indices_c1 = [] - common_indices_c2 = [] - for i, pc1 in enumerate(c1_collapsed): - for j, pc2 in enumerate(c2_collapsed): - if pc1 == pc2: - common_indices_c1.append(i) - common_indices_c2.append(j) - break - - # Get changing pitch indices - changing_indices_c1 = [ - i for i in range(len(c1_pitches)) if i not in common_indices_c1 - ] - changing_indices_c2 = [ - i for i in range(len(c2_transposed_pitches)) if i not in common_indices_c2 - ] - - # Build base map for common pitches: index -> index - base_map = {} - for i in common_indices_c1: - dest_idx = common_indices_c2[common_indices_c1.index(i)] - base_map[i] = dest_idx - - # If no changing pitches, return just the base map - if not changing_indices_c1: - return [base_map] - - # For changing pitches, find all valid permutations - # Each changing pitch in c2 must be adjacent to some pitch in c1 - c1_changing = [c1_pitches[i] for i in changing_indices_c1] - c2_changing = [c2_transposed_pitches[i] for i in changing_indices_c2] - - # Find valid pairings: which c1 pitch can map to which c2 pitch (must be adjacent) - valid_pairings = [] - for p1 in c1_changing: - pairings = [] - for p2 in c2_changing: - if self._is_adjacent_pitches(p1, p2): - cents = abs(p1.to_cents() - p2.to_cents()) - pairings.append((p1, p2, cents)) - valid_pairings.append(pairings) - - # Generate all permutations and filter valid ones - all_maps = [] - num_changing = len(c2_changing) - - # For each permutation of c2_changing indices - for perm in permutations(range(num_changing)): - new_map = dict(base_map) # Start with common pitches - - valid = True - for i, c1_idx in enumerate(changing_indices_c1): - dest_idx = changing_indices_c2[perm[i]] - new_map[c1_idx] = dest_idx - - if valid: - all_maps.append(new_map) - - return all_maps - - def _calc_symdiff(self, c1: Chord, c2: Chord) -> int: - """Calculate symmetric difference between two chords. - - Uses the pitches directly. - """ - set1 = set(c1.pitches) - set2 = set(c2.pitches) - return len(set1.symmetric_difference(set2)) - - def _check_voice_leading_connectivity(self, c1: Chord, c2: Chord) -> bool: - """ - Check that each pitch that changes is connected (adjacent in lattice) - to some pitch in the previous chord. - - Uses transposed pitches directly without collapsing. - """ - # Use pitches directly (transposed form) - c1_pitches = set(c1.pitches) - c2_pitches = set(c2.pitches) - - # Find pitches that change - common = c1_pitches & c2_pitches - changing = c2_pitches - c1_pitches - - if not changing: - return False # No change = no edge - - # For each changing pitch, check if it's adjacent to any pitch in c1 - for p2 in changing: - is_adjacent = False - for p1 in c1_pitches: - if self._is_adjacent_pitches(p1, p2): - is_adjacent = True - break - if not is_adjacent: - return False # A changing pitch is not connected - - return True - - def _is_adjacent_pitches(self, p1: Pitch, p2: Pitch) -> bool: - """Check if two collapsed pitches are adjacent (differ by ±1 in one dimension). - - For collapsed harmonic space, skip dimension 0 (the octave dimension). - """ - diff_count = 0 - # Skip dimension 0 (octave) in CHS - for d in range(1, len(self.dims)): - diff = abs(p1[d] - p2[d]) - if diff > 1: - return False - if diff == 1: - diff_count += 1 - return diff_count == 1 - - def _invert_transposition(self, trans: Pitch) -> Pitch: - """Invert a transposition.""" - return Pitch(tuple(-t for t in trans.hs_array), self.dims) - - -# ============================================================================ -# PATH FINDER -# ============================================================================ - - -class PathFinder: - """Finds paths through voice leading graphs.""" - - def __init__(self, graph: nx.MultiDiGraph): - self.graph = graph - - def find_stochastic_path( - self, - start_chord: Chord | None = None, - max_length: int = 100, - weights_config: dict | None = None, - ) -> list[Chord]: - """ - Find a stochastic path through the graph. - - Args: - start_chord: Starting chord (random if None) - max_length: Maximum path length - weights_config: Configuration for edge weighting - - Returns: - List of Chord objects representing the path - """ - if weights_config is None: - weights_config = self._default_weights_config() - - # Initialize - chord = self._initialize_chords(start_chord) - if not chord or chord[0] is None or len(self.graph.nodes()) == 0: - return [] - - original_chord = chord[0] - - # The graph node to use for edge lookup (original chord from harmonic space) - graph_node = original_chord - - # The output chord for path - output_chord = original_chord - - path = [output_chord] - last_graph_nodes = (graph_node,) - - # Track graph nodes separately for Hamiltonian check (untransposed) - graph_path = [graph_node] - - # Track cumulative transposition across all steps - # Start with identity (zero transposition) - dims = output_chord.dims - cumulative_trans = Pitch(tuple(0 for _ in range(len(dims))), dims) - - # Track voice mapping: voice_map[i] = which original voice is at position i - # Start with identity - num_voices = len(output_chord.pitches) - voice_map = list(range(num_voices)) - - # Track how long each voice position has stayed (not moved) - voice_stay_count = [0] * num_voices - - for _ in range(max_length): - # Find edges from the graph node (original chord) - out_edges = list(self.graph.out_edges(graph_node, data=True)) - - if not out_edges: - break - - # Calculate weights for each edge - weights = self._calculate_edge_weights( - out_edges, - path, - last_graph_nodes, - weights_config, - tuple(voice_stay_count), - graph_path, - ) - - # Select edge stochastically - edge = choices(out_edges, weights=weights)[0] - next_graph_node = edge[1] # This is also an original chord from the graph - trans = edge[2].get("transposition") - movement = edge[2].get("movements", {}) - - # Update voice stay counts before applying new movement - # A voice "stays" if its destination index equals its source index - for src_idx, dest_idx in movement.items(): - if src_idx == dest_idx: - voice_stay_count[src_idx] += 1 - else: - voice_stay_count[src_idx] = 0 # Voice moved - reset count - - # Compose voice mapping with movement map - # movement: src_idx -> dest_idx (voice at src moves to dest) - # voice_map: position i -> original voice - # new_voice_map[dest_idx] = voice_map[src_idx] - new_voice_map = [None] * num_voices - for src_idx, dest_idx in movement.items(): - new_voice_map[dest_idx] = voice_map[src_idx] - voice_map = new_voice_map - - # Add this edge's transposition to cumulative - if trans is not None: - cumulative_trans = cumulative_trans.transpose(trans) - - # Get transposed chord - transposed = next_graph_node.transpose(cumulative_trans) - - # Reorder pitches according to voice mapping - # voice_map[i] = which original voice is at position i - reordered_pitches = tuple( - transposed.pitches[voice_map[i]] for i in range(num_voices) - ) - output_chord = Chord(reordered_pitches, dims) - - # Move to next graph node - graph_node = next_graph_node - - # Track graph nodes for Hamiltonian check - graph_path.append(graph_node) - - path.append(output_chord) - last_graph_nodes = last_graph_nodes + (graph_node,) - if len(last_graph_nodes) > 2: - last_graph_nodes = last_graph_nodes[-2:] - - return path - - def _initialize_chords(self, start_chord: Chord | None) -> tuple: - """Initialize chord sequence. - - Returns: - Tuple of (original_chord,) - """ - if start_chord is not None: - return (start_chord,) - - # Random start - try multiple times to find a chord with valid edges (respecting voice crossing) - nodes = list(self.graph.nodes()) - if nodes: - # Shuffle and try a few - import random - - random.shuffle(nodes) - - # Get default weights config for voice crossing check - weights_config = self._default_weights_config() - weights_config["voice_crossing_allowed"] = False - - for chord in nodes[:50]: # Try up to 50 random chords - out_edges = list(self.graph.out_edges(chord, data=True)) - if len(out_edges) == 0: - continue - - # Test if any edges are valid with voice crossing check - weights = self._calculate_edge_weights( - out_edges, [chord], (chord,), weights_config, None - ) - nonzero = sum(1 for w in weights if w > 0) - - if nonzero > 0: - # Found a valid starting chord - return (chord,) - - # Fall back to first node if none found - return (nodes[0],) - - return (None,) - - def _default_weights_config(self) -> dict: - """Default weights configuration.""" - return { - "contrary_motion": True, - "direct_tuning": True, - "voice_crossing_allowed": False, # False = reject edges with voice crossing - "melodic_threshold_min": 0, - "melodic_threshold_max": 500, - "hamiltonian": True, # Favor unvisited nodes - "dca": 2.0, # Direct Connected Adjacent: boost for moving stagnant voices - } - - def _calculate_edge_weights( - self, - out_edges: list, - path: list[Chord], - last_chords: tuple[Chord, ...], - config: dict, - voice_stay_count: tuple[int, ...] | None = None, - graph_path: list[Chord] | None = None, - ) -> list[float]: - """Calculate weights for edges based on configuration.""" - weights = [] - - # Get DCA (Dissonant Counterpoint Algorithm) settings - dca_multiplier = config.get("dca", 0) - if dca_multiplier is None: - dca_multiplier = 0 - - # Get melodic threshold settings - melodic_min = config.get("melodic_threshold_min", 0) - melodic_max = config.get("melodic_threshold_max", float("inf")) - - for edge in out_edges: - w = 1.0 - edge_data = edge[2] - - # Read pre-computed edge properties from graph - cent_diffs = edge_data.get("cent_diffs", []) - voice_crossing = edge_data.get("voice_crossing", False) - is_directly_tunable = edge_data.get("is_directly_tunable", False) - - # Melodic threshold check: ALL movements must be within min/max range - if melodic_min is not None or melodic_max is not None: - all_within_range = True - for cents in cent_diffs: - if melodic_min is not None and cents < melodic_min: - all_within_range = False - break - if melodic_max is not None and cents > melodic_max: - all_within_range = False - break - - if all_within_range: - w *= 10 # Boost for within range - else: - w = 0.0 # Penalty for outside range - - if w == 0.0: - weights.append(w) - continue - - # Contrary motion weight - if config.get("contrary_motion", False): - if len(cent_diffs) >= 3: - sorted_diffs = sorted(cent_diffs) - if sorted_diffs[0] < 0 and sorted_diffs[-1] > 0: - w *= 100 - - # Direct tuning weight - if config.get("direct_tuning", False): - if is_directly_tunable: - w *= 10 - - # Voice crossing check - use precomputed from graph - # Since all chords in graph are now sorted (bass-to-soprano), this should work correctly - if not config.get("voice_crossing_allowed", False): - if edge_data.get("voice_crossing", False): - w = 0.0 # Reject edges with voice crossing - - # Hamiltonian weight - favor unvisited nodes - if config.get("hamiltonian", False): - destination = edge[1] - # Use graph_path (untransposed) for Hamiltonian check - if graph_path and destination in graph_path: - w *= 0.1 # Penalize revisiting nodes - else: - w *= 10 # Boost for unvisited nodes - - # DCA (Dissonant Counterpoint Algorithm) - boost for moving stagnant voices - if dca_multiplier > 0 and voice_stay_count is not None and len(path) > 0: - # Determine which voices would move in this edge - source_chord = path[-1] - movements = edge_data.get("movements", {}) - - # Calculate stay count boost: which voices would move and what's their stay count - move_boost = 1.0 - for voice_idx in range(len(voice_stay_count)): - # Check if this voice moves (dest != src) - # movements maps src_idx -> dest_idx - if voice_idx in movements: - dest_idx = movements[voice_idx] - if dest_idx != voice_idx: - # Voice is moving - use its stay count as boost - stay_count = voice_stay_count[voice_idx] - move_boost *= dca_multiplier**stay_count - - w *= move_boost - - weights.append(w) - - return weights - - def is_hamiltonian(self, path: list[Chord]) -> bool: - """Check if a path is Hamiltonian (visits all nodes exactly once).""" - return len(path) == len(self.graph.nodes()) and len(set(path)) == len(path) - - -# ============================================================================ -# I/O -# ============================================================================ - - -def write_chord_sequence(seq: list[Chord], path: str) -> None: - """Write a chord sequence to a JSON file.""" - import json - - # Convert to serializable format - serializable = [] - for chord in seq: - chord_data = [] - for pitch in chord._pitches: - chord_data.append( - { - "hs_array": list(pitch.hs_array), - "fraction": str(pitch.to_fraction()), - "cents": pitch.to_cents(), - } - ) - serializable.append(chord_data) - - # Write with formatting - content = json.dumps(serializable, indent=2) - content = content.replace("[[[", "[\n\t[[") - content = content.replace(", [[", ",\n\t[[") - content = content.replace("]]]", "]]\n]") - - with open(path, "w") as f: - f.write(content) - - -def write_chord_sequence_readable(seq: list[Chord], path: str) -> None: - """Write chord sequence as tuple of hs_arrays - one line per chord.""" - with open(path, "w") as f: - f.write("(\n") - for i, chord in enumerate(seq): - arrays = tuple(p.hs_array for p in chord._pitches) - f.write(f" {arrays},\n") - f.write(")\n") - - -def write_chord_sequence_frequencies( - seq: list[Chord], path: str, fundamental: float = 100.0 -) -> None: - """Write chord sequence as frequencies in Hz - one line per chord.""" - with open(path, "w") as f: - f.write("(\n") - for chord in seq: - freqs = tuple(fundamental * float(p.to_fraction()) for p in chord._pitches) - f.write(f" {freqs},\n") - f.write(")\n") - - -# ============================================================================ -# MAIN / DEMO -# ============================================================================ - - -def main(): - """Demo: Generate compact sets and build graph.""" - import argparse - - parser = argparse.ArgumentParser( - description="Generate chord paths in harmonic space" - ) - parser.add_argument( - "--symdiff-min", - type=int, - default=2, - help="Minimum symmetric difference between chords", - ) - parser.add_argument( - "--symdiff-max", - type=int, - default=2, - help="Maximum symmetric difference between chords", - ) - parser.add_argument( - "--melodic-min", - type=int, - default=0, - help="Minimum cents for any pitch movement (0 = no minimum)", - ) - parser.add_argument( - "--melodic-max", - type=int, - default=500, - help="Maximum cents for any pitch movement (0 = no maximum)", - ) - parser.add_argument( - "--dca", - type=float, - default=2.0, - help="DCA (Dissonant Counterpoint Algorithm) multiplier for voice momentum (0 to disable)", - ) - parser.add_argument( - "--allow-voice-crossing", - action="store_true", - help="Allow edges where voices cross (default: reject)", - ) - parser.add_argument( - "--dims", type=int, default=7, help="Number of prime dimensions (4, 5, 7, or 8)" - ) - parser.add_argument("--chord-size", type=int, default=3, help="Size of chords") - parser.add_argument("--max-path", type=int, default=50, help="Maximum path length") - parser.add_argument("--seed", type=int, default=42, help="Random seed") - args = parser.parse_args() - - # Select dims based on argument - if args.dims == 4: - dims = DIMS_4 - elif args.dims == 5: - dims = DIMS_5 - elif args.dims == 7: - dims = DIMS_7 - elif args.dims == 8: - dims = DIMS_8 - else: - dims = DIMS_7 - - # Set up harmonic space - space = HarmonicSpace(dims, collapsed=True) - print(f"Space: {space}") - print(f"Symdiff: {args.symdiff_min} to {args.symdiff_max}") - - # Generate connected sets - print("Generating connected sets...") - chords = space.generate_connected_sets( - min_size=args.chord_size, max_size=args.chord_size - ) - print(f"Found {len(chords)} unique chords") - - # Build voice leading graph - print("Building voice leading graph...") - graph = space.build_voice_leading_graph( - chords, - symdiff_min=args.symdiff_min, - symdiff_max=args.symdiff_max, - ) - print(f"Graph: {graph.number_of_nodes()} nodes, {graph.number_of_edges()} edges") - - # Find stochastic path - print("Finding stochastic path...") - path_finder = PathFinder(graph) - seed(args.seed) - - # Set up weights config with melodic thresholds - weights_config = path_finder._default_weights_config() - weights_config["melodic_threshold_min"] = args.melodic_min - weights_config["melodic_threshold_max"] = args.melodic_max - weights_config["dca"] = args.dca - weights_config["voice_crossing_allowed"] = args.allow_voice_crossing - - path = path_finder.find_stochastic_path( - max_length=args.max_path, weights_config=weights_config - ) - print(f"Path length: {len(path)}") - - # Write output - write_chord_sequence(path, "output_chords.json") - print("Written to output_chords.json") - - write_chord_sequence_readable(path, "output_chords.txt") - print("Written to output_chords.txt") - - write_chord_sequence_frequencies(path, "output_frequencies.txt") - print("Written to output_frequencies.txt") - +from src.io import main if __name__ == "__main__": main() diff --git a/src/__init__.py b/src/__init__.py new file mode 100644 index 0000000..680e4fd --- /dev/null +++ b/src/__init__.py @@ -0,0 +1,23 @@ +#!/usr/bin/env python +""" +Compact Sets: A rational theory of harmony + +Based on Michael Winter's theory of conjunct connected sets in harmonic space, +combining ideas from Tom Johnson, James Tenney, and Larry Polansky. +""" + +from .pitch import Pitch, DIMS_4, DIMS_5, DIMS_7, DIMS_8 +from .chord import Chord +from .harmonic_space import HarmonicSpace +from .graph import PathFinder + +__all__ = [ + "Pitch", + "Chord", + "HarmonicSpace", + "PathFinder", + "DIMS_4", + "DIMS_5", + "DIMS_7", + "DIMS_8", +] diff --git a/src/chord.py b/src/chord.py new file mode 100644 index 0000000..c85e50a --- /dev/null +++ b/src/chord.py @@ -0,0 +1,116 @@ +#!/usr/bin/env python +""" +Chord class - a set of pitches forming a connected subgraph in harmonic space. + +A chord is a tuple of Pitches. Two chords are equivalent under +transposition if they have the same intervallic structure. +""" + +from __future__ import annotations +from typing import Iterator + +from .pitch import Pitch + + +class Chord: + def __init__(self, pitches: tuple[Pitch, ...], dims: tuple[int, ...] | None = None): + from .pitch import DIMS_7 + + self.dims = dims if dims is not None else DIMS_7 + self._pitches = pitches + + def __hash__(self) -> int: + return hash(self._pitches) + + def __eq__(self, other: object) -> bool: + if not isinstance(other, Chord): + return NotImplemented + return self._pitches == other._pitches + + def __repr__(self) -> str: + return f"Chord({self._pitches})" + + def __iter__(self) -> Iterator[Pitch]: + return iter(self._pitches) + + def __len__(self) -> int: + return len(self._pitches) + + def __getitem__(self, index: int) -> Pitch: + return self._pitches[index] + + @property + def pitches(self) -> tuple[Pitch, ...]: + """Get the pitches as a tuple.""" + return self._pitches + + @property + def collapsed_pitches(self) -> set[Pitch]: + """Get all pitches collapsed to pitch class.""" + return set(p.collapse() for p in self._pitches) + + def is_connected(self) -> bool: + """ + Check if the chord forms a connected subgraph in harmonic space. + + A set is connected if every pitch can be reached from every other + by stepping through adjacent pitches (differing by ±1 in one dimension). + """ + if len(self._pitches) <= 1: + return True + + adj = {p: set() for p in self._pitches} + + for i, p1 in enumerate(self._pitches): + for p2 in self._pitches[i + 1 :]: + if self._is_adjacent(p1, p2): + adj[p1].add(p2) + adj[p2].add(p1) + + visited = {self._pitches[0]} + queue = [self._pitches[0]] + + while queue: + current = queue.pop(0) + for neighbor in adj[current]: + if neighbor not in visited: + visited.add(neighbor) + queue.append(neighbor) + + return len(visited) == len(self._pitches) + + def _is_adjacent(self, p1: Pitch, p2: Pitch) -> bool: + """Check if two pitches are adjacent (differ by ±1 in exactly one dimension). + + For collapsed harmonic space, skip dimension 0 (the octave dimension). + """ + diff_count = 0 + for d in range(1, len(self.dims)): + diff = abs(p1[d] - p2[d]) + if diff > 1: + return False + if diff == 1: + diff_count += 1 + return diff_count == 1 + + def symmetric_difference_size(self, other: Chord) -> int: + """Calculate the size of symmetric difference between two chords.""" + set1 = set(p.collapse() for p in self._pitches) + set2 = set(p.collapse() for p in other._pitches) + return len(set1.symmetric_difference(set2)) + + def size_difference(self, other: Chord) -> int: + """Calculate the absolute difference in chord sizes.""" + return abs(len(self._pitches) - len(other._pitches)) + + def project_all(self) -> list[Pitch]: + """Project all pitches to [1, 2) range.""" + return [p.project() for p in self._pitches] + + def transpose(self, trans: Pitch) -> Chord: + """Transpose the entire chord.""" + return Chord(tuple(p.transpose(trans) for p in self._pitches), self.dims) + + def sorted_by_frequency(self) -> list[Pitch]: + """Sort pitches by frequency (low to high).""" + return sorted(self._pitches, key=lambda p: p.to_fraction()) diff --git a/src/graph.py b/src/graph.py new file mode 100644 index 0000000..ad3d659 --- /dev/null +++ b/src/graph.py @@ -0,0 +1,234 @@ +#!/usr/bin/env python +""" +PathFinder - finds paths through voice leading graphs. +""" + +from __future__ import annotations +import networkx as nx +from random import choices, seed +from typing import Iterator + + +class PathFinder: + """Finds paths through voice leading graphs.""" + + def __init__(self, graph: nx.MultiDiGraph): + self.graph = graph + + def find_stochastic_path( + self, + start_chord: "Chord | None" = None, + max_length: int = 100, + weights_config: dict | None = None, + ) -> list["Chord"]: + """Find a stochastic path through the graph.""" + if weights_config is None: + weights_config = self._default_weights_config() + + chord = self._initialize_chords(start_chord) + if not chord or chord[0] is None or len(self.graph.nodes()) == 0: + return [] + + original_chord = chord[0] + graph_node = original_chord + output_chord = original_chord + + path = [output_chord] + last_graph_nodes = (graph_node,) + + graph_path = [graph_node] + + from .pitch import Pitch + + dims = output_chord.dims + cumulative_trans = Pitch(tuple(0 for _ in range(len(dims))), dims) + + num_voices = len(output_chord.pitches) + voice_map = list(range(num_voices)) + + voice_stay_count = [0] * num_voices + + for _ in range(max_length): + out_edges = list(self.graph.out_edges(graph_node, data=True)) + + if not out_edges: + break + + weights = self._calculate_edge_weights( + out_edges, + path, + last_graph_nodes, + weights_config, + tuple(voice_stay_count), + graph_path, + ) + + edge = choices(out_edges, weights=weights)[0] + next_graph_node = edge[1] + trans = edge[2].get("transposition") + movement = edge[2].get("movements", {}) + + for src_idx, dest_idx in movement.items(): + if src_idx == dest_idx: + voice_stay_count[src_idx] += 1 + else: + voice_stay_count[src_idx] = 0 + + new_voice_map = [None] * num_voices + for src_idx, dest_idx in movement.items(): + new_voice_map[dest_idx] = voice_map[src_idx] + voice_map = new_voice_map + + if trans is not None: + cumulative_trans = cumulative_trans.transpose(trans) + + transposed = next_graph_node.transpose(cumulative_trans) + + reordered_pitches = tuple( + transposed.pitches[voice_map[i]] for i in range(num_voices) + ) + from .chord import Chord + + output_chord = Chord(reordered_pitches, dims) + + graph_node = next_graph_node + graph_path.append(graph_node) + + path.append(output_chord) + last_graph_nodes = last_graph_nodes + (graph_node,) + if len(last_graph_nodes) > 2: + last_graph_nodes = last_graph_nodes[-2:] + + return path + + def _initialize_chords(self, start_chord: "Chord | None") -> tuple: + """Initialize chord sequence.""" + if start_chord is not None: + return (start_chord,) + + nodes = list(self.graph.nodes()) + if nodes: + import random + + random.shuffle(nodes) + + weights_config = self._default_weights_config() + weights_config["voice_crossing_allowed"] = False + + for chord in nodes[:50]: + out_edges = list(self.graph.out_edges(chord, data=True)) + if len(out_edges) == 0: + continue + + weights = self._calculate_edge_weights( + out_edges, [chord], (chord,), weights_config, None + ) + nonzero = sum(1 for w in weights if w > 0) + + if nonzero > 0: + return (chord,) + + return (nodes[0],) + + return (None,) + + def _default_weights_config(self) -> dict: + """Default weights configuration.""" + return { + "contrary_motion": True, + "direct_tuning": True, + "voice_crossing_allowed": False, + "melodic_threshold_min": 0, + "melodic_threshold_max": 500, + "hamiltonian": True, + "dca": 2.0, + } + + def _calculate_edge_weights( + self, + out_edges: list, + path: list["Chord"], + last_chords: tuple["Chord", ...], + config: dict, + voice_stay_count: tuple[int, ...] | None = None, + graph_path: list["Chord"] | None = None, + ) -> list[float]: + """Calculate weights for edges based on configuration.""" + weights = [] + + dca_multiplier = config.get("dca", 0) + if dca_multiplier is None: + dca_multiplier = 0 + + melodic_min = config.get("melodic_threshold_min", 0) + melodic_max = config.get("melodic_threshold_max", float("inf")) + + for edge in out_edges: + w = 1.0 + edge_data = edge[2] + + cent_diffs = edge_data.get("cent_diffs", []) + voice_crossing = edge_data.get("voice_crossing", False) + is_directly_tunable = edge_data.get("is_directly_tunable", False) + + if melodic_min is not None or melodic_max is not None: + all_within_range = True + for cents in cent_diffs: + if melodic_min is not None and cents < melodic_min: + all_within_range = False + break + if melodic_max is not None and cents > melodic_max: + all_within_range = False + break + + if all_within_range: + w *= 10 + else: + w = 0.0 + + if w == 0.0: + weights.append(w) + continue + + if config.get("contrary_motion", False): + if len(cent_diffs) >= 3: + sorted_diffs = sorted(cent_diffs) + if sorted_diffs[0] < 0 and sorted_diffs[-1] > 0: + w *= 100 + + if config.get("direct_tuning", False): + if is_directly_tunable: + w *= 10 + + if not config.get("voice_crossing_allowed", False): + if edge_data.get("voice_crossing", False): + w = 0.0 + + if config.get("hamiltonian", False): + destination = edge[1] + if graph_path and destination in graph_path: + w *= 0.1 + else: + w *= 10 + + if dca_multiplier > 0 and voice_stay_count is not None and len(path) > 0: + source_chord = path[-1] + movements = edge_data.get("movements", {}) + + move_boost = 1.0 + for voice_idx in range(len(voice_stay_count)): + if voice_idx in movements: + dest_idx = movements[voice_idx] + if dest_idx != voice_idx: + stay_count = voice_stay_count[voice_idx] + move_boost *= dca_multiplier**stay_count + + w *= move_boost + + weights.append(w) + + return weights + + def is_hamiltonian(self, path: list["Chord"]) -> bool: + """Check if a path is Hamiltonian (visits all nodes exactly once).""" + return len(path) == len(self.graph.nodes()) and len(set(path)) == len(path) diff --git a/src/harmonic_space.py b/src/harmonic_space.py new file mode 100644 index 0000000..6563a87 --- /dev/null +++ b/src/harmonic_space.py @@ -0,0 +1,396 @@ +#!/usr/bin/env python +""" +Harmonic space - multidimensional lattice where dimensions = prime factors. + +HS_l = harmonic space with first l primes +CHS_l = collapsed harmonic space +""" + +from __future__ import annotations +from typing import Iterator + +import networkx as nx + +from .pitch import Pitch, DIMS_7 +from .chord import Chord + + +class HarmonicSpace: + """ + Harmonic space HS_l or collapsed harmonic space CHS_l. + + A multidimensional lattice where each dimension corresponds to a prime factor. + """ + + def __init__(self, dims: tuple[int, ...] = DIMS_7, collapsed: bool = True): + self.dims = dims + self.collapsed = collapsed + + def __repr__(self) -> str: + suffix = " (collapsed)" if self.collapsed else "" + return f"HarmonicSpace({self.dims}{suffix})" + + def pitch(self, hs_array: tuple[int, ...]) -> Pitch: + """Create a Pitch in this space.""" + return Pitch(hs_array, self.dims) + + def chord(self, pitches: tuple[Pitch, ...]) -> Chord: + """Create a Chord in this space.""" + return Chord(pitches, self.dims) + + def root(self) -> Pitch: + """Get the root pitch (1/1).""" + return self.pitch(tuple(0 for _ in self.dims)) + + def _branch_from(self, vertex: tuple[int, ...]) -> set[tuple[int, ...]]: + """ + Get all vertices adjacent to the given vertex. + + For collapsed harmonic space, skip dimension 0 (the octave dimension). + """ + branches = set() + start_dim = 1 if self.collapsed else 0 + + for i in range(start_dim, len(self.dims)): + for delta in (-1, 1): + branch = list(vertex) + branch[i] += delta + branches.add(tuple(branch)) + + return branches + + def generate_connected_sets( + self, min_size: int, max_size: int, collapsed: bool = True + ) -> set[Chord]: + """ + Generate all unique connected sets of a given size. + + Args: + min_size: Minimum number of pitches in a chord + max_size: Maximum number of pitches in a chord + collapsed: If True, use CHS (skip dim 0 in branching). + If False (default), include dim 0 in branching. + + Returns: + Set of unique Chord objects + """ + root = tuple(0 for _ in self.dims) + + def branch_from(vertex): + """Get adjacent vertices. Skip dim 0 for CHS.""" + branches = set() + start_dim = 1 if collapsed else 0 + for i in range(start_dim, len(self.dims)): + for delta in (-1, 1): + branch = list(vertex) + branch[i] += delta + branches.add(tuple(branch)) + return branches + + def grow( + chord: tuple[tuple[int, ...], ...], + connected: set[tuple[int, ...]], + visited: set[tuple[int, ...]], + ) -> Iterator[tuple[tuple[int, ...], ...]]: + """Recursively grow connected sets.""" + if min_size <= len(chord) <= max_size: + if collapsed: + projected = [] + for arr in chord: + p = self.pitch(arr) + projected.append(p.project().hs_array) + yield tuple(projected) + else: + yield chord + + if len(chord) < max_size: + visited = set(visited) + for b in connected: + if b not in visited: + extended = chord + (b,) + new_connected = connected | branch_from(b) + visited.add(b) + yield from grow(extended, new_connected, visited) + + connected = branch_from(root) + visited = {root} + + results = set() + for chord_arrays in grow((root,), connected, visited): + pitches = tuple(self.pitch(arr) for arr in chord_arrays) + sorted_pitches = tuple(sorted(pitches, key=lambda p: p.to_fraction())) + results.add(Chord(sorted_pitches, self.dims)) + + return results + + def build_voice_leading_graph( + self, + chords: set[Chord], + symdiff_min: int = 2, + symdiff_max: int = 2, + ) -> nx.MultiDiGraph: + """ + Build a voice leading graph from a set of chords. + + Args: + chords: Set of Chord objects + symdiff_min: Minimum symmetric difference between chords + symdiff_max: Maximum symmetric difference between chords + + Returns: + NetworkX MultiDiGraph + """ + from itertools import combinations + + symdiff_range = (symdiff_min, symdiff_max) + graph = nx.MultiDiGraph() + + for chord in chords: + graph.add_node(chord) + + for c1, c2 in combinations(chords, 2): + edges = self._find_valid_edges(c1, c2, symdiff_range) + for edge_data in edges: + ( + trans, + weight, + movements, + cent_diffs, + voice_crossing, + is_directly_tunable, + ) = edge_data + graph.add_edge( + c1, + c2, + transposition=trans, + weight=weight, + movements=movements, + cent_diffs=cent_diffs, + voice_crossing=voice_crossing, + is_directly_tunable=is_directly_tunable, + ) + graph.add_edge( + c2, + c1, + transposition=self._invert_transposition(trans), + weight=weight, + movements=self._reverse_movements(movements), + cent_diffs=list(reversed(cent_diffs)), + voice_crossing=voice_crossing, + is_directly_tunable=is_directly_tunable, + ) + + return graph + + def _reverse_movements(self, movements: dict) -> dict: + """Reverse the movement mappings (index to index).""" + reversed_movements = {} + for src_idx, dest_idx in movements.items(): + reversed_movements[dest_idx] = src_idx + return reversed_movements + + def _is_directly_tunable( + self, + c1_pitches: tuple[Pitch, ...], + c2_transposed_pitches: tuple[Pitch, ...], + movements: dict, + ) -> bool: + """Check if all changing pitches are adjacent (directly tunable) to a staying pitch.""" + staying_indices = [i for i in range(len(c1_pitches)) if movements.get(i) == i] + + if not staying_indices: + return False + + changing_indices = [ + i for i in range(len(c1_pitches)) if i not in staying_indices + ] + + if not changing_indices: + return True + + for ch_idx in changing_indices: + ch_pitch = c2_transposed_pitches[ch_idx] + is_adjacent_to_staying = False + + for st_idx in staying_indices: + st_pitch = c1_pitches[st_idx] + if self._is_adjacent_pitches(st_pitch, ch_pitch): + is_adjacent_to_staying = True + break + + if not is_adjacent_to_staying: + return False + + return True + + def _find_valid_edges( + self, + c1: Chord, + c2: Chord, + symdiff_range: tuple[int, int], + ) -> list[tuple[Pitch, float, dict, list[float], bool, bool]]: + """Find all valid edges between two chords.""" + from itertools import combinations as iter_combinations + + edges = [] + + transpositions = { + p1.pitch_difference(p2) for p1 in c1.pitches for p2 in c2.pitches + } + + for trans in transpositions: + c2_transposed = c2.transpose(trans) + + symdiff = self._calc_symdiff(c1, c2_transposed) + + if not (symdiff_range[0] <= symdiff <= symdiff_range[1]): + continue + + voice_lead_ok = self._check_voice_leading_connectivity(c1, c2_transposed) + + if not voice_lead_ok: + continue + + movement_maps = self._build_movement_maps(c1.pitches, c2_transposed.pitches) + + for movements in movement_maps: + cent_diffs = [] + for src_idx, dest_idx in movements.items(): + src_pitch = c1.pitches[src_idx] + dst_pitch = c2_transposed.pitches[dest_idx] + cents = abs(src_pitch.to_cents() - dst_pitch.to_cents()) + cent_diffs.append(cents) + + source = list(c1.pitches) + destination = [None] * len(source) + for src_idx, dest_idx in movements.items(): + destination[dest_idx] = c2_transposed.pitches[src_idx] + + voice_crossing = False + for i in range(len(destination) - 1): + if destination[i] is None or destination[i + 1] is None: + continue + if destination[i].to_fraction() >= destination[i + 1].to_fraction(): + voice_crossing = True + break + + is_directly_tunable = self._is_directly_tunable( + c1.pitches, c2_transposed.pitches, movements + ) + + edges.append( + ( + trans, + 1.0, + movements, + cent_diffs, + voice_crossing, + is_directly_tunable, + ) + ) + + return edges + + def _build_movement_maps( + self, c1_pitches: tuple[Pitch, ...], c2_transposed_pitches: tuple[Pitch, ...] + ) -> list[dict]: + """Build all valid movement maps for c1 -> c2_transposed.""" + from itertools import permutations + + c1_collapsed = [p.collapse() for p in c1_pitches] + c2_collapsed = [p.collapse() for p in c2_transposed_pitches] + + common_indices_c1 = [] + common_indices_c2 = [] + for i, pc1 in enumerate(c1_collapsed): + for j, pc2 in enumerate(c2_collapsed): + if pc1 == pc2: + common_indices_c1.append(i) + common_indices_c2.append(j) + break + + changing_indices_c1 = [ + i for i in range(len(c1_pitches)) if i not in common_indices_c1 + ] + changing_indices_c2 = [ + i for i in range(len(c2_transposed_pitches)) if i not in common_indices_c2 + ] + + base_map = {} + for i in common_indices_c1: + dest_idx = common_indices_c2[common_indices_c1.index(i)] + base_map[i] = dest_idx + + if not changing_indices_c1: + return [base_map] + + c1_changing = [c1_pitches[i] for i in changing_indices_c1] + c2_changing = [c2_transposed_pitches[i] for i in changing_indices_c2] + + valid_pairings = [] + for p1 in c1_changing: + pairings = [] + for p2 in c2_changing: + if self._is_adjacent_pitches(p1, p2): + cents = abs(p1.to_cents() - p2.to_cents()) + pairings.append((p1, p2, cents)) + valid_pairings.append(pairings) + + all_maps = [] + num_changing = len(c2_changing) + + for perm in permutations(range(num_changing)): + new_map = dict(base_map) + + valid = True + for i, c1_idx in enumerate(changing_indices_c1): + dest_idx = changing_indices_c2[perm[i]] + new_map[c1_idx] = dest_idx + + if valid: + all_maps.append(new_map) + + return all_maps + + def _calc_symdiff(self, c1: Chord, c2: Chord) -> int: + """Calculate symmetric difference between two chords.""" + set1 = set(c1.pitches) + set2 = set(c2.pitches) + return len(set1.symmetric_difference(set2)) + + def _check_voice_leading_connectivity(self, c1: Chord, c2: Chord) -> bool: + """Check that each pitch that changes is connected (adjacent in lattice) to some pitch in the previous chord.""" + c1_pitches = set(c1.pitches) + c2_pitches = set(c2.pitches) + + common = c1_pitches & c2_pitches + changing = c2_pitches - c1_pitches + + if not changing: + return False + + for p2 in changing: + is_adjacent = False + for p1 in c1_pitches: + if self._is_adjacent_pitches(p1, p2): + is_adjacent = True + break + if not is_adjacent: + return False + + return True + + def _is_adjacent_pitches(self, p1: Pitch, p2: Pitch) -> bool: + """Check if two collapsed pitches are adjacent (differ by ±1 in one dimension).""" + diff_count = 0 + for d in range(1, len(self.dims)): + diff = abs(p1[d] - p2[d]) + if diff > 1: + return False + if diff == 1: + diff_count += 1 + return diff_count == 1 + + def _invert_transposition(self, trans: Pitch) -> Pitch: + """Invert a transposition.""" + return Pitch(tuple(-t for t in trans.hs_array), self.dims) diff --git a/src/io.py b/src/io.py new file mode 100644 index 0000000..22300c6 --- /dev/null +++ b/src/io.py @@ -0,0 +1,419 @@ +#!/usr/bin/env python +""" +I/O functions and CLI main entry point. +""" + +import json +from fractions import Fraction +from pathlib import Path +from random import seed + + +def write_chord_sequence(seq: list["Chord"], path: str) -> None: + """Write a chord sequence to a JSON file.""" + serializable = [] + for chord in seq: + chord_data = [] + for pitch in chord._pitches: + chord_data.append( + { + "hs_array": list(pitch.hs_array), + "fraction": str(pitch.to_fraction()), + "cents": pitch.to_cents(), + } + ) + serializable.append(chord_data) + + content = json.dumps(serializable, indent=2) + content = content.replace("[[[", "[\n\t[[") + content = content.replace(", [[", ",\n\t[[") + content = content.replace("]]]", "]]\n]") + + with open(path, "w") as f: + f.write(content) + + +def write_chord_sequence_readable(seq: list["Chord"], path: str) -> None: + """Write chord sequence as tuple of hs_arrays - one line per chord.""" + with open(path, "w") as f: + f.write("(\n") + for i, chord in enumerate(seq): + arrays = tuple(p.hs_array for p in chord._pitches) + f.write(f" {arrays},\n") + f.write(")\n") + + +def write_chord_sequence_frequencies( + seq: list["Chord"], path: str, fundamental: float = 100.0 +) -> None: + """Write chord sequence as frequencies in Hz - one line per chord.""" + with open(path, "w") as f: + f.write("(\n") + for chord in seq: + freqs = tuple(fundamental * float(p.to_fraction()) for p in chord._pitches) + f.write(f" {freqs},\n") + f.write(")\n") + + +def graph_to_dict(graph: "nx.MultiDiGraph") -> dict: + """Serialize graph to a dict for JSON.""" + from .pitch import Pitch + from .chord import Chord + + nodes = [] + node_to_idx = {} + for idx, chord in enumerate(graph.nodes()): + nodes.append( + { + "pitches": [list(p.hs_array) for p in chord.pitches], + "dims": list(chord.dims), + } + ) + node_to_idx[id(chord)] = idx + + edges = [] + for u, v, data in graph.edges(data=True): + edges.append( + { + "src_idx": node_to_idx[id(u)], + "dst_idx": node_to_idx[id(v)], + "transposition": list( + data.get( + "transposition", Pitch(tuple([0] * len(u.dims)), u.dims) + ).hs_array + ), + "weight": data.get("weight", 1.0), + "movements": {str(k): v for k, v in data.get("movements", {}).items()}, + "cent_diffs": data.get("cent_diffs", []), + "voice_crossing": data.get("voice_crossing", False), + "is_directly_tunable": data.get("is_directly_tunable", False), + } + ) + + return { + "nodes": nodes, + "edges": edges, + } + + +def graph_from_dict(data: dict) -> "nx.MultiDiGraph": + """Deserialize graph from dict.""" + import networkx as nx + from .pitch import Pitch + from .chord import Chord + + nodes = [] + for node_data in data["nodes"]: + pitches = tuple( + Pitch(tuple(arr), tuple(node_data["dims"])) for arr in node_data["pitches"] + ) + nodes.append(Chord(pitches, tuple(node_data["dims"]))) + + graph = nx.MultiDiGraph() + for node in nodes: + graph.add_node(node) + + for edge_data in data["edges"]: + u = nodes[edge_data["src_idx"]] + v = nodes[edge_data["dst_idx"]] + trans = Pitch(tuple(edge_data["transposition"]), u.dims) + movements = {int(k): v for k, v in edge_data["movements"].items()} + + graph.add_edge( + u, + v, + transposition=trans, + weight=edge_data.get("weight", 1.0), + movements=movements, + cent_diffs=edge_data.get("cent_diffs", []), + voice_crossing=edge_data.get("voice_crossing", False), + is_directly_tunable=edge_data.get("is_directly_tunable", False), + ) + + return graph + + +def save_graph_pickle(graph: "nx.MultiDiGraph", path: str) -> None: + """Save graph to pickle file.""" + import pickle + + with open(path, "wb") as f: + pickle.dump(graph, f) + + +def load_graph_pickle(path: str) -> "nx.MultiDiGraph": + """Load graph from pickle file.""" + import pickle + + with open(path, "rb") as f: + return pickle.load(f) + + +def save_graph_json(graph: "nx.MultiDiGraph", path: str) -> None: + """Save graph to JSON file.""" + data = graph_to_dict(graph) + with open(path, "w") as f: + json.dump(data, f, indent=2) + + +def load_graph_json(path: str) -> "nx.MultiDiGraph": + """Load graph from JSON file.""" + import json + + with open(path, "r") as f: + data = json.load(f) + return graph_from_dict(data) + + +def get_cache_key( + dims: int, chord_size: int, symdiff_min: int, symdiff_max: int +) -> str: + """Generate cache key from parameters.""" + return f"d{dims}_n{size}_s{min}-{max}".replace("{size}", str(chord_size)) + + +def load_graph_from_cache( + cache_dir: str, + dims: int, + chord_size: int, + symdiff_min: int, + symdiff_max: int, +) -> tuple["nx.MultiDiGraph | None", bool]: + """ + Try to load graph from cache. + + Returns: + (graph, was_cached): graph if found, False if not found + """ + cache_key = f"d{dims}_n{chord_size}_s{symdiff_min}-{symdiff_max}" + pkl_path = Path(cache_dir) / f"{cache_key}.pkl" + json_path = Path(cache_dir) / f"{cache_key}.json" + + # Try pickle first (faster) + if pkl_path.exists(): + try: + graph = load_graph_pickle(str(pkl_path)) + return graph, True + except Exception as e: + print(f"Warning: Failed to load pickle cache: {e}") + + # Try JSON + if json_path.exists(): + try: + graph = load_graph_json(str(json_path)) + return graph, True + except Exception as e: + print(f"Warning: Failed to load JSON cache: {e}") + + return None, False + + +def save_graph_to_cache( + graph: "nx.MultiDiGraph", + cache_dir: str, + dims: int, + chord_size: int, + symdiff_min: int, + symdiff_max: int, +) -> None: + """Save graph to cache in both pickle and JSON formats.""" + import os + + cache_key = f"d{dims}_n{chord_size}_s{symdiff_min}-{symdiff_max}" + pkl_path = Path(cache_dir) / f"{cache_key}.pkl" + json_path = Path(cache_dir) / f"{cache_key}.json" + + os.makedirs(cache_dir, exist_ok=True) + + # Save both formats + try: + save_graph_pickle(graph, str(pkl_path)) + print(f"Cached to {pkl_path}") + except Exception as e: + print(f"Warning: Failed to save pickle: {e}") + + try: + save_graph_json(graph, str(json_path)) + print(f"Cached to {json_path}") + except Exception as e: + print(f"Warning: Failed to save JSON: {e}") + + +def main(): + """Demo: Generate compact sets and build graph.""" + import argparse + from .pitch import DIMS_4, DIMS_5, DIMS_7, DIMS_8 + from .harmonic_space import HarmonicSpace + from .graph import PathFinder + + parser = argparse.ArgumentParser( + description="Generate chord paths in harmonic space" + ) + parser.add_argument( + "--symdiff-min", + type=int, + default=2, + help="Minimum symmetric difference between chords", + ) + parser.add_argument( + "--symdiff-max", + type=int, + default=2, + help="Maximum symmetric difference between chords", + ) + parser.add_argument( + "--melodic-min", + type=int, + default=0, + help="Minimum cents for any pitch movement (0 = no minimum)", + ) + parser.add_argument( + "--melodic-max", + type=int, + default=500, + help="Maximum cents for any pitch movement (0 = no maximum)", + ) + parser.add_argument( + "--dca", + type=float, + default=2.0, + help="DCA (Dissonant Counterpoint Algorithm) multiplier for voice momentum (0 to disable)", + ) + parser.add_argument( + "--allow-voice-crossing", + action="store_true", + help="Allow edges where voices cross (default: reject)", + ) + parser.add_argument( + "--dims", type=int, default=7, help="Number of prime dimensions (4, 5, 7, or 8)" + ) + parser.add_argument("--chord-size", type=int, default=3, help="Size of chords") + parser.add_argument("--max-path", type=int, default=50, help="Maximum path length") + parser.add_argument( + "--seed", type=int, default=None, help="Random seed (default: random)" + ) + parser.add_argument( + "--cache-dir", + type=str, + default="./cache", + help="Cache directory for graphs", + ) + parser.add_argument( + "--rebuild-cache", + action="store_true", + help="Force rebuild graph (ignore cache)", + ) + parser.add_argument( + "--no-cache", + action="store_true", + help="Disable caching", + ) + parser.add_argument( + "--output-dir", + type=str, + default="output", + help="Output directory for generated files", + ) + args = parser.parse_args() + + # Select dims + if args.dims == 4: + dims = DIMS_4 + elif args.dims == 5: + dims = DIMS_5 + elif args.dims == 7: + dims = DIMS_7 + elif args.dims == 8: + dims = DIMS_8 + else: + dims = DIMS_7 + + space = HarmonicSpace(dims, collapsed=True) + print(f"Space: {space}") + print(f"Symdiff: {args.symdiff_min} to {args.symdiff_max}") + + # Try to load from cache + graph = None + was_cached = False + + if not args.no_cache and not args.rebuild_cache: + graph, was_cached = load_graph_from_cache( + args.cache_dir, + args.dims, + args.chord_size, + args.symdiff_min, + args.symdiff_max, + ) + if was_cached: + print(f"Loaded graph from cache") + print( + f"Graph: {graph.number_of_nodes()} nodes, {graph.number_of_edges()} edges" + ) + + # Build graph if not loaded from cache + if graph is None: + print("Generating connected sets...") + chords = space.generate_connected_sets( + min_size=args.chord_size, max_size=args.chord_size + ) + print(f"Found {len(chords)} unique chords") + + print("Building voice leading graph...") + graph = space.build_voice_leading_graph( + chords, + symdiff_min=args.symdiff_min, + symdiff_max=args.symdiff_max, + ) + print( + f"Graph: {graph.number_of_nodes()} nodes, {graph.number_of_edges()} edges" + ) + + # Save to cache + if not args.no_cache: + save_graph_to_cache( + graph, + args.cache_dir, + args.dims, + args.chord_size, + args.symdiff_min, + args.symdiff_max, + ) + + # Find stochastic path + print("Finding stochastic path...") + path_finder = PathFinder(graph) + if args.seed is not None: + seed(args.seed) + + weights_config = path_finder._default_weights_config() + weights_config["melodic_threshold_min"] = args.melodic_min + weights_config["melodic_threshold_max"] = args.melodic_max + weights_config["dca"] = args.dca + weights_config["voice_crossing_allowed"] = args.allow_voice_crossing + + path = path_finder.find_stochastic_path( + max_length=args.max_path, weights_config=weights_config + ) + print(f"Path length: {len(path)}") + + # Create output directory and write files + import os + + os.makedirs(args.output_dir, exist_ok=True) + + write_chord_sequence(path, os.path.join(args.output_dir, "output_chords.json")) + print(f"Written to {args.output_dir}/output_chords.json") + + write_chord_sequence_readable( + path, os.path.join(args.output_dir, "output_chords.txt") + ) + print(f"Written to {args.output_dir}/output_chords.txt") + + write_chord_sequence_frequencies( + path, os.path.join(args.output_dir, "output_frequencies.txt") + ) + print(f"Written to {args.output_dir}/output_frequencies.txt") + + +if __name__ == "__main__": + main() diff --git a/src/pitch.py b/src/pitch.py new file mode 100644 index 0000000..f7f0805 --- /dev/null +++ b/src/pitch.py @@ -0,0 +1,92 @@ +#!/usr/bin/env python +""" +Pitch class - a point in harmonic space. + +Represented as an array of exponents on prime dimensions. +Example: (0, 1, 0, 0) represents 3/2 (perfect fifth) in CHS_7 +""" + +from __future__ import annotations +from fractions import Fraction +from math import log +from operator import add +from typing import Iterator + +DIMS_8 = (2, 3, 5, 7, 11, 13, 17, 19) +DIMS_7 = (2, 3, 5, 7, 11, 13, 17) +DIMS_5 = (2, 3, 5, 7, 11) +DIMS_4 = (2, 3, 5, 7) + + +class Pitch: + def __init__(self, hs_array: tuple[int, ...], dims: tuple[int, ...] | None = None): + self.hs_array = hs_array + self.dims = dims if dims is not None else DIMS_7 + + def __hash__(self) -> int: + return hash(self.hs_array) + + def __eq__(self, other: object) -> bool: + if not isinstance(other, Pitch): + return NotImplemented + return self.hs_array == other.hs_array + + def __repr__(self) -> str: + return f"Pitch({self.hs_array})" + + def __iter__(self): + return iter(self.hs_array) + + def __len__(self) -> int: + return len(self.hs_array) + + def __getitem__(self, index: int) -> int: + return self.hs_array[index] + + def to_fraction(self) -> Fraction: + """Convert to frequency ratio (e.g., 3/2).""" + from math import prod + + return Fraction( + prod(pow(self.dims[d], self.hs_array[d]) for d in range(len(self.dims))) + ) + + def to_cents(self) -> float: + """Convert to cents (relative to 1/1 = 0 cents).""" + fr = self.to_fraction() + return 1200 * log(float(fr), 2) + + def collapse(self) -> Pitch: + """ + Collapse pitch so frequency ratio is in [1, 2). + + This removes octave information, useful for pitch classes. + """ + collapsed = list(self.hs_array) + fr = self.to_fraction() + + if fr < 1: + while fr < 1: + fr *= 2 + collapsed[0] += 1 + elif fr >= 2: + while fr >= 2: + fr /= 2 + collapsed[0] -= 1 + + return Pitch(tuple(collapsed), self.dims) + + def project(self) -> Pitch: + """Project pitch to [1, 2) range - same as collapse.""" + return self.collapse() + + def transpose(self, trans: Pitch) -> Pitch: + """Transpose by another pitch (add exponents element-wise).""" + return Pitch(tuple(map(add, self.hs_array, trans.hs_array)), self.dims) + + def pitch_difference(self, other: Pitch) -> Pitch: + """Calculate the pitch difference (self - other).""" + return Pitch( + tuple(self.hs_array[d] - other.hs_array[d] for d in range(len(self.dims))), + self.dims, + )