compact_sets/compact_sets.py
Michael Winter c44dd60e83 Fix transposition deduplication and rename expand to project
- Deduplicate transpositions in _find_valid_edges using set comprehension
  to avoid processing same transposition multiple times
- Edge count now matches notebook (1414 vs 2828)
- Rename expand() to project() for clarity (project to [1,2) range)
- Fix SyntaxWarnings in docstrings (escape backslashes)
2026-03-13 00:28:34 +01:00

1451 lines
52 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python
"""
Compact Sets: A rational theory of harmony
Based on Michael Winter's theory of conjunct connected sets in harmonic space,
combining ideas from Tom Johnson, James Tenney, and Larry Polansky.
Mathematical foundations:
- Harmonic space: multidimensional lattice where dimensions = prime factors
- Connected sets: chords forming a connected sublattice
- Voice leading graphs: edges based on symmetric difference + melodic thresholds
"""
from __future__ import annotations
from fractions import Fraction
from itertools import combinations, permutations, product
from math import prod, log
from operator import add
from random import choice, choices, seed
from typing import Iterator
import networkx as nx
# ============================================================================
# CONSTANTS
# ============================================================================
DIMS_8 = (2, 3, 5, 7, 11, 13, 17, 19)
DIMS_7 = (2, 3, 5, 7, 11, 13, 17)
DIMS_5 = (2, 3, 5, 7, 11)
DIMS_4 = (2, 3, 5, 7)
# ============================================================================
# PITCH
# ============================================================================
class Pitch:
"""
A point in harmonic space.
Represented as an array of exponents on prime dimensions.
Example: (0, 1, 0, 0) represents 3/2 (perfect fifth) in CHS_7
"""
def __init__(self, hs_array: tuple[int, ...], dims: tuple[int, ...] | None = None):
"""
Initialize a pitch from a harmonic series array.
Args:
hs_array: Tuple of exponents for each prime dimension
dims: Tuple of primes defining the harmonic space (defaults to DIMS_7)
"""
self.hs_array = hs_array
self.dims = dims if dims is not None else DIMS_7
def __hash__(self) -> int:
return hash(self.hs_array)
def __eq__(self, other: object) -> bool:
if not isinstance(other, Pitch):
return NotImplemented
return self.hs_array == other.hs_array
def __repr__(self) -> str:
return f"Pitch({self.hs_array})"
def __iter__(self):
return iter(self.hs_array)
def __len__(self) -> int:
return len(self.hs_array)
def __getitem__(self, index: int) -> int:
return self.hs_array[index]
def to_fraction(self) -> Fraction:
"""Convert to frequency ratio (e.g., 3/2)."""
return Fraction(
prod(pow(self.dims[d], self.hs_array[d]) for d in range(len(self.dims)))
)
def to_cents(self) -> float:
"""Convert to cents (relative to 1/1 = 0 cents)."""
fr = self.to_fraction()
return 1200 * log(float(fr), 2)
def collapse(self) -> Pitch:
"""
Collapse pitch so frequency ratio is in [1, 2).
This removes octave information, useful for pitch classes.
"""
collapsed = list(self.hs_array)
fr = self.to_fraction()
if fr < 1:
while fr < 1:
fr *= 2
collapsed[0] += 1
elif fr >= 2:
while fr >= 2:
fr /= 2
collapsed[0] -= 1
return Pitch(tuple(collapsed), self.dims)
def project(self) -> Pitch:
"""Project pitch to [1, 2) range - same as collapse."""
return self.collapse()
def transpose(self, trans: Pitch) -> Pitch:
"""Transpose by another pitch (add exponents element-wise)."""
return Pitch(tuple(map(add, self.hs_array, trans.hs_array)), self.dims)
def pitch_difference(self, other: Pitch) -> Pitch:
"""Calculate the pitch difference (self - other)."""
return Pitch(
tuple(self.hs_array[d] - other.hs_array[d] for d in range(len(self.dims))),
self.dims,
)
# ============================================================================
# CHORD
# ============================================================================
class Chord:
"""
A set of pitches forming a connected subgraph in harmonic space.
A chord is a tuple of Pitches. Two chords are equivalent under
transposition if they have the same intervallic structure.
"""
def __init__(self, pitches: tuple[Pitch, ...], dims: tuple[int, ...] | None = None):
"""
Initialize a chord from a tuple of pitches.
Args:
pitches: Tuple of Pitch objects
dims: Harmonic space dimensions (defaults to DIMS_7)
"""
self.dims = dims if dims is not None else DIMS_7
self._pitches = pitches
def __hash__(self) -> int:
return hash(self._pitches)
def __eq__(self, other: object) -> bool:
if not isinstance(other, Chord):
return NotImplemented
return self._pitches == other._pitches
def __repr__(self) -> str:
return f"Chord({self._pitches})"
def __iter__(self) -> Iterator[Pitch]:
return iter(self._pitches)
def __len__(self) -> int:
return len(self._pitches)
def __getitem__(self, index: int) -> Pitch:
return self._pitches[index]
@property
def pitches(self) -> tuple[Pitch, ...]:
"""Get the pitches as a tuple."""
return self._pitches
@property
def collapsed_pitches(self) -> set[Pitch]:
"""Get all pitches collapsed to pitch class."""
return set(p.collapse() for p in self._pitches)
def is_connected(self) -> bool:
"""
Check if the chord forms a connected subgraph in harmonic space.
A set is connected if every pitch can be reached from every other
by stepping through adjacent pitches (differing by ±1 in one dimension).
"""
if len(self._pitches) <= 1:
return True
# Build adjacency through single steps
adj = {p: set() for p in self._pitches}
for i, p1 in enumerate(self._pitches):
for p2 in self._pitches[i + 1 :]:
if self._is_adjacent(p1, p2):
adj[p1].add(p2)
adj[p2].add(p1)
# BFS from first pitch
visited = {self._pitches[0]}
queue = [self._pitches[0]]
while queue:
current = queue.pop(0)
for neighbor in adj[current]:
if neighbor not in visited:
visited.add(neighbor)
queue.append(neighbor)
return len(visited) == len(self._pitches)
def _is_adjacent(self, p1: Pitch, p2: Pitch) -> bool:
"""Check if two pitches are adjacent (differ by ±1 in exactly one dimension).
For collapsed harmonic space, skip dimension 0 (the 2/octave dimension).
"""
diff_count = 0
# Start from dimension 1 (skip dimension 0 = octave in CHS)
for d in range(1, len(self.dims)):
diff = abs(p1[d] - p2[d])
if diff > 1:
return False
if diff == 1:
diff_count += 1
return diff_count == 1
def symmetric_difference_size(self, other: Chord) -> int:
"""Calculate the size of symmetric difference between two chords."""
set1 = set(p.collapse() for p in self._pitches)
set2 = set(p.collapse() for p in other._pitches)
return len(set1.symmetric_difference(set2))
def size_difference(self, other: Chord) -> int:
"""Calculate the absolute difference in chord sizes."""
return abs(len(self._pitches) - len(other._pitches))
def project_all(self) -> list[Pitch]:
"""Project all pitches to [1, 2) range."""
return [p.project() for p in self._pitches]
def transpose(self, trans: Pitch) -> Chord:
"""Transpose the entire chord."""
return Chord(tuple(p.transpose(trans) for p in self._pitches), self.dims)
def sorted_by_frequency(self) -> list[Pitch]:
"""Sort pitches by frequency (low to high)."""
return sorted(self._pitches, key=lambda p: p.to_fraction())
# ============================================================================
# HARMONIC SPACE
# ============================================================================
class HarmonicSpace:
"""
Harmonic space HS_l or collapsed harmonic space CHS_l.
A multidimensional lattice where each dimension corresponds to a prime factor.
"""
def __init__(self, dims: tuple[int, ...] = DIMS_7, collapsed: bool = True):
"""
Initialize harmonic space.
Args:
dims: Tuple of primes defining the space (e.g., (2, 3, 5, 7))
collapsed: If True, use collapsed harmonic space (CHS_l)
"""
self.dims = dims
self.collapsed = collapsed
def __repr__(self) -> str:
suffix = " (collapsed)" if self.collapsed else ""
return f"HarmonicSpace({self.dims}{suffix})"
def pitch(self, hs_array: tuple[int, ...]) -> Pitch:
"""Create a Pitch in this space."""
return Pitch(hs_array, self.dims)
def chord(self, pitches: tuple[Pitch, ...]) -> Chord:
"""Create a Chord in this space."""
return Chord(pitches, self.dims)
def root(self) -> Pitch:
"""Get the root pitch (1/1)."""
return self.pitch(tuple(0 for _ in self.dims))
def _branch_from(self, vertex: tuple[int, ...]) -> set[tuple[int, ...]]:
"""
Get all vertices adjacent to the given vertex.
For collapsed harmonic space, skip dimension 0 (the octave dimension).
"""
branches = set()
# Skip dimension 0 (octave) in collapsed harmonic space
start_dim = 1 if self.collapsed else 0
for i in range(start_dim, len(self.dims)):
for delta in (-1, 1):
branch = list(vertex)
branch[i] += delta
branches.add(tuple(branch))
return branches
def generate_connected_sets(
self, min_size: int, max_size: int, collapsed: bool = True
) -> set[Chord]:
"""
Generate all unique connected sets of a given size.
Args:
min_size: Minimum number of pitches in a chord
max_size: Maximum number of pitches in a chord
collapsed: If True, use CHS (skip dim 0 in branching).
If False (default), include dim 0 in branching.
Returns:
Set of unique Chord objects
"""
root = tuple(0 for _ in self.dims)
def branch_from(vertex):
"""Get adjacent vertices. Skip dim 0 for CHS."""
branches = set()
start_dim = 1 if collapsed else 0
for i in range(start_dim, len(self.dims)):
for delta in (-1, 1):
branch = list(vertex)
branch[i] += delta
branches.add(tuple(branch))
return branches
def grow(
chord: tuple[tuple[int, ...], ...],
connected: set[tuple[int, ...]],
visited: set[tuple[int, ...]],
) -> Iterator[tuple[tuple[int, ...], ...]]:
"""Recursively grow connected sets."""
# Yield if within size bounds
if min_size <= len(chord) <= max_size:
# If collapsed=True, project each pitch to [1,2)
if collapsed:
projected = []
for arr in chord:
p = self.pitch(arr)
projected.append(p.project().hs_array)
yield tuple(projected)
else:
yield chord
# Continue growing if not at max size
if len(chord) < max_size:
visited = set(visited)
for b in connected:
if b not in visited:
extended = chord + (b,)
new_connected = connected | branch_from(b)
visited.add(b)
yield from grow(extended, new_connected, visited)
# Start generation from root
connected = branch_from(root)
visited = {root}
results = set()
for chord_arrays in grow((root,), connected, visited):
pitches = tuple(self.pitch(arr) for arr in chord_arrays)
results.add(Chord(pitches, self.dims))
return results
def generate_connected_sets_with_edges(
self, min_size: int, max_size: int, symdiff_range: tuple[int, int]
) -> tuple[set[Chord], list[tuple[Chord, Chord, dict]]]:
"""
Generate chords and find edges using sibling grouping.
For symdiff=2: group chords by parent (chord with one fewer pitch)
All siblings (same parent) have symdiff=2 with each other after transposition.
This version finds ALL parents for each chord to ensure complete coverage.
Args:
min_size: Minimum number of pitches in a chord
max_size: Maximum number of pitches in a chord
symdiff_range: (min, max) symmetric difference for valid edges
Returns:
Tuple of (chords set, list of edges with data)
"""
# Generate all chords first
chords_set = self.generate_connected_sets(min_size, max_size)
# Find ALL parents for each chord
# A parent is any size-(k-1) connected subset that can grow to this chord
chord_to_parents: dict[Chord, list[Chord]] = {}
for chord in chords_set:
if len(chord) <= min_size:
chord_to_parents[chord] = []
continue
parents = []
pitches_list = list(chord.pitches)
# Try removing each pitch to find possible parents
for i in range(len(pitches_list)):
candidate_pitches = pitches_list[:i] + pitches_list[i + 1 :]
if len(candidate_pitches) < min_size:
continue
candidate = Chord(tuple(candidate_pitches), self.dims)
if candidate.is_connected():
parents.append(candidate)
chord_to_parents[chord] = parents
# Group chords by parent - a chord may appear in multiple parent groups
from collections import defaultdict
parent_to_children: dict[tuple, list[Chord]] = defaultdict(list)
for chord, parents in chord_to_parents.items():
for parent in parents:
# Use sorted pitches as key
parent_key = tuple(sorted(p.hs_array for p in parent.pitches))
parent_to_children[parent_key].append(chord)
# Find edges between siblings
edges = []
seen_edges = set() # Deduplicate
from itertools import combinations
for parent_key, children in parent_to_children.items():
if len(children) < 2:
continue
# For each pair of siblings
for c1, c2 in combinations(children, 2):
edge_data = self._find_valid_edges(c1, c2, symdiff_range)
for (
trans,
weight,
movements,
cent_diffs,
voice_crossing,
is_dt,
) in edge_data:
# Create edge key for deduplication (smaller chord first)
c1_key = tuple(sorted(p.hs_array for p in c1.pitches))
c2_key = tuple(sorted(p.hs_array for p in c2.pitches))
edge_key = (
(c1_key, c2_key, tuple(sorted(movements.items()))),
trans.hs_array,
)
if edge_key not in seen_edges:
seen_edges.add(edge_key)
edges.append(
(
c1,
c2,
{
"transposition": trans,
"weight": weight,
"movements": movements,
"cent_diffs": cent_diffs,
"voice_crossing": voice_crossing,
"is_directly_tunable": is_dt,
},
)
)
inv_trans = self._invert_transposition(trans)
# Reverse edge
rev_edge_key = (
(
c2_key,
c1_key,
tuple(sorted(self._reverse_movements(movements).items())),
),
inv_trans.hs_array,
)
if rev_edge_key not in seen_edges:
seen_edges.add(rev_edge_key)
edges.append(
(
c2,
c1,
{
"transposition": inv_trans,
"weight": weight,
"movements": self._reverse_movements(movements),
"cent_diffs": list(reversed(cent_diffs)),
"voice_crossing": voice_crossing,
"is_directly_tunable": is_dt,
},
)
)
return chords_set, edges
def _is_terminating(self, pitch: Pitch, chord: Chord) -> bool:
"""Check if removing this pitch leaves the remaining pitches connected."""
remaining = tuple(p for p in chord.pitches if p != pitch)
if len(remaining) <= 1:
return True
remaining_chord = Chord(remaining, self.dims)
return remaining_chord.is_connected()
def build_graph_lattice_method(
self,
chords: set[Chord],
symdiff_min: int = 2,
symdiff_max: int = 2,
) -> nx.MultiDiGraph:
"""
Build voice leading graph using lattice neighbor traversal.
Algorithm:
1. For each chord C in our set
2. For each terminating pitch p in C (removing keeps remaining connected)
3. For each remaining pitch q in C \\ p:
For each adjacent pitch r to q (in full harmonic space):
Form C' = (C \\ p) {r}
If C' contains root -> add edge C -> C' (automatically valid)
If C' doesn't contain root -> transpose by each pitch -> add edges
No connectivity checks needed - guaranteed by construction.
Args:
chords: Set of Chord objects
symdiff_min: Minimum symmetric difference (typically 2)
symdiff_max: Maximum symmetric difference (typically 2)
Returns:
NetworkX MultiDiGraph
"""
graph = nx.MultiDiGraph()
for chord in chords:
graph.add_node(chord)
chord_index = {}
for chord in chords:
sig = tuple(sorted(p.hs_array for p in chord.pitches))
chord_index[sig] = chord
edges = []
edge_set = set()
root = self.pitch(tuple(0 for _ in self.dims))
for chord in chords:
chord_pitches = list(chord.pitches)
k = len(chord_pitches)
for p in chord_pitches:
if not self._is_terminating(p, chord):
continue
remaining = [x for x in chord_pitches if x != p]
for q in remaining:
# Generate adjacent pitches in CHS (skipping dim 0)
for d in range(1, len(self.dims)):
for delta in (-1, 1):
arr = list(q.hs_array)
arr[d] += delta
r = Pitch(tuple(arr), self.dims)
if r in chord_pitches:
continue
new_pitches = remaining + [r]
new_chord = Chord(tuple(new_pitches), self.dims)
contains_root = root in new_chord.pitches
if contains_root:
target_sig = tuple(
sorted(p.hs_array for p in new_chord.pitches)
)
target = chord_index.get(target_sig)
if target and target != chord:
edge_key = (chord, target)
if edge_key not in edge_set:
edge_set.add(edge_key)
movements, cent_diffs, voice_crossing = (
self._compute_edge_data_fast(chord, target)
)
if movements is not None:
is_dt = self._is_directly_tunable(
chord.pitches, target.pitches, movements
)
edges.append(
(
chord,
target,
{
"transposition": root.pitch_difference(
root
),
"weight": 1.0,
"movements": movements,
"cent_diffs": cent_diffs,
"voice_crossing": voice_crossing,
"is_directly_tunable": is_dt,
},
)
)
else:
for p_trans in new_chord.pitches:
trans = root.pitch_difference(p_trans)
transposed = new_chord.transpose(trans)
if root in transposed.pitches:
target_sig = tuple(
sorted(
p.hs_array for p in transposed.pitches
)
)
target = chord_index.get(target_sig)
if target and target != chord:
edge_key = (chord, target)
if edge_key not in edge_set:
edge_set.add(edge_key)
(
movements,
cent_diffs,
voice_crossing,
) = self._compute_edge_data_fast(
chord, target
)
if movements is not None:
is_dt = self._is_directly_tunable(
chord.pitches,
target.pitches,
movements,
)
edges.append(
(
chord,
target,
{
"transposition": trans,
"weight": 1.0,
"movements": movements,
"cent_diffs": cent_diffs,
"voice_crossing": voice_crossing,
"is_directly_tunable": is_dt,
},
)
)
for u, v, data in edges:
graph.add_edge(u, v, **data)
inv_trans = self._invert_transposition(data["transposition"])
inv_movements = self._reverse_movements(data["movements"])
inv_cent_diffs = list(reversed(data["cent_diffs"]))
graph.add_edge(
v,
u,
transposition=inv_trans,
weight=1.0,
movements=inv_movements,
cent_diffs=inv_cent_diffs,
voice_crossing=data["voice_crossing"],
is_directly_tunable=data["is_directly_tunable"],
)
return graph
def _compute_edge_data_fast(self, c1: Chord, c2: Chord):
"""Compute edge data directly from two chords without transposition."""
c1_pitches = c1.pitches
c2_pitches = c2.pitches
k = len(c1_pitches)
c1_collapsed = [p.collapse() for p in c1_pitches]
c2_collapsed = [p.collapse() for p in c2_pitches]
common_c1 = []
common_c2 = []
for i, pc1 in enumerate(c1_collapsed):
for j, pc2 in enumerate(c2_collapsed):
if pc1 == pc2:
common_c1.append(i)
common_c2.append(j)
break
movements = {}
for src_idx, dest_idx in zip(common_c1, common_c2):
movements[src_idx] = dest_idx
changing_c1 = [i for i in range(k) if i not in common_c1]
changing_c2 = [j for j in range(k) if j not in common_c2]
if len(changing_c1) != len(changing_c2):
return None, None, None
if changing_c1:
valid = True
for src_i, dest_j in zip(changing_c1, changing_c2):
p1 = c1_pitches[src_i]
p2 = c2_pitches[dest_j]
if not self._is_adjacent_pitches(p1, p2):
valid = False
break
movements[src_i] = dest_j
if not valid:
return None, None, None
cent_diffs = []
for src_idx, dest_idx in movements.items():
src_pitch = c1_pitches[src_idx]
dst_pitch = c2_pitches[dest_idx]
cents = abs(src_pitch.to_cents() - dst_pitch.to_cents())
cent_diffs.append(cents)
voice_crossing = not all(movements.get(i, i) == i for i in range(k))
return movements, cent_diffs, voice_crossing
def _wrap_pitch(self, hs_array: tuple[int, ...]) -> tuple[int, ...]:
"""Wrap a pitch so its frequency ratio is in [1, 2)."""
p = self.pitch(hs_array)
return p.collapse().hs_array
def _toCHS(self, hs_array: tuple[int, ...]) -> tuple[int, ...]:
"""
Convert a pitch to Collapsed Harmonic Space (CHS).
In CHS, all pitches have dimension 0 = 0.
This is different from collapse() which only ensures frequency in [1, 2).
Steps:
1. First collapse to [1,2) to get pitch class
2. Then set dimension 0 = 0
"""
# First collapse to [1,2)
p = self.pitch(hs_array)
collapsed = p.collapse().hs_array
# Then set dim 0 = 0
result = list(collapsed)
result[0] = 0
return tuple(result)
def build_voice_leading_graph(
self,
chords: set[Chord],
symdiff_min: int = 2,
symdiff_max: int = 2,
) -> nx.MultiDiGraph:
"""
Build a voice leading graph from a set of chords.
Args:
chords: Set of Chord objects
symdiff_min: Minimum symmetric difference between chords
symdiff_max: Maximum symmetric difference between chords
Returns:
NetworkX MultiDiGraph
"""
symdiff_range = (symdiff_min, symdiff_max)
graph = nx.MultiDiGraph()
# Add all chords as nodes
for chord in chords:
graph.add_node(chord)
# Add edges based on local morphological constraints
for c1, c2 in combinations(chords, 2):
edges = self._find_valid_edges(c1, c2, symdiff_range)
for edge_data in edges:
(
trans,
weight,
movements,
cent_diffs,
voice_crossing,
is_directly_tunable,
) = edge_data
graph.add_edge(
c1,
c2,
transposition=trans,
weight=weight,
movements=movements,
cent_diffs=cent_diffs,
voice_crossing=voice_crossing,
is_directly_tunable=is_directly_tunable,
)
graph.add_edge(
c2,
c1,
transposition=self._invert_transposition(trans),
weight=weight,
movements=self._reverse_movements(movements),
cent_diffs=list(
reversed(cent_diffs)
), # reverse for opposite direction
voice_crossing=voice_crossing, # same in reverse
is_directly_tunable=is_directly_tunable,
)
return graph
def _reverse_movements(self, movements: dict) -> dict:
"""Reverse the movement mappings (index to index)."""
reversed_movements = {}
for src_idx, dest_idx in movements.items():
reversed_movements[dest_idx] = src_idx
return reversed_movements
def _is_directly_tunable(
self,
c1_pitches: tuple[Pitch, ...],
c2_transposed_pitches: tuple[Pitch, ...],
movements: dict,
) -> bool:
"""
Check if all changing pitches are adjacent (directly tunable) to a staying pitch.
A changing pitch is directly tunable if it differs from a staying pitch
by exactly one prime dimension (±1 in one dimension, 0 in all others).
"""
# Find staying pitches (where movement is identity: i -> i)
staying_indices = [i for i in range(len(c1_pitches)) if movements.get(i) == i]
if not staying_indices:
return False # No staying pitch to tune to
# Find changing pitches
changing_indices = [
i for i in range(len(c1_pitches)) if i not in staying_indices
]
if not changing_indices:
return True # No changing pitches = directly tunable
# For each changing pitch, check if it's adjacent to any staying pitch
for ch_idx in changing_indices:
ch_pitch = c2_transposed_pitches[ch_idx]
is_adjacent_to_staying = False
for st_idx in staying_indices:
st_pitch = c1_pitches[st_idx]
if self._is_adjacent_pitches(st_pitch, ch_pitch):
is_adjacent_to_staying = True
break
if not is_adjacent_to_staying:
return False
return True
def _find_valid_edges(
self,
c1: Chord,
c2: Chord,
symdiff_range: tuple[int, int],
) -> list[tuple[Pitch, float, dict, list[float], bool, bool]]:
"""
Find all valid edges between two chords.
Tests all transpositions of c2 to find ones that satisfy
the symmetric difference constraint AND each changing pitch
is connected (adjacent) to a pitch in the previous chord.
Returns:
List of (transposition, weight, movements, cent_diffs, voice_crossing, is_directly_tunable) tuples.
- movements: dict {src_idx: dest_idx}
- cent_diffs: list of cent differences per voice
- voice_crossing: True if voices cross
- is_directly_tunable: True if all changing pitches adjacent to staying pitch
"""
edges = []
# Get unique transpositions first (fast deduplication)
transpositions = {
p1.pitch_difference(p2) for p1 in c1.pitches for p2 in c2.pitches
}
# Try each unique transposition
for trans in transpositions:
# Transpose c2
c2_transposed = c2.transpose(trans)
# Check symmetric difference on transposed pitches (not collapsed)
symdiff = self._calc_symdiff_expanded(c1, c2_transposed)
if not (symdiff_range[0] <= symdiff <= symdiff_range[1]):
continue
# CRITICAL: Each changing pitch must be connected to a pitch in c1
voice_lead_ok = self._check_voice_leading_connectivity(c1, c2_transposed)
if not voice_lead_ok:
continue
# Build all valid movement maps (one per permutation of changing pitches)
movement_maps = self._build_movement_maps(c1.pitches, c2_transposed.pitches)
# Create one edge per movement map with computed edge properties
for movements in movement_maps:
# Compute cent_diffs for each voice
cent_diffs = []
for src_idx, dest_idx in movements.items():
src_pitch = c1.pitches[src_idx]
dst_pitch = c2_transposed.pitches[dest_idx]
cents = abs(src_pitch.to_cents() - dst_pitch.to_cents())
cent_diffs.append(cents)
# Check voice_crossing: True if any voice moves to different position
num_voices = len(c1.pitches)
voice_crossing = not all(
movements.get(i, i) == i for i in range(num_voices)
)
# Check is_directly_tunable: changing pitches are adjacent to staying pitch
is_directly_tunable = self._is_directly_tunable(
c1.pitches, c2_transposed.pitches, movements
)
edges.append(
(
trans,
1.0,
movements,
cent_diffs,
voice_crossing,
is_directly_tunable,
)
)
return edges
def _build_movement_maps(
self, c1_pitches: tuple[Pitch, ...], c2_transposed_pitches: tuple[Pitch, ...]
) -> list[dict]:
"""
Build all valid movement maps for c1 -> c2_transposed.
A movement map shows which pitch in c1 maps to which pitch in c2,
including the cent difference for each movement.
Returns:
List of movement maps. Each map is {source_pitch: {"destination": dest_pitch, "cent_difference": cents}}
There may be multiple valid maps if multiple changing pitches can be permuted.
"""
# Find common pitches (same pitch class in both)
c1_collapsed = [p.collapse() for p in c1_pitches]
c2_collapsed = [p.collapse() for p in c2_transposed_pitches]
common_indices_c1 = []
common_indices_c2 = []
for i, pc1 in enumerate(c1_collapsed):
for j, pc2 in enumerate(c2_collapsed):
if pc1 == pc2:
common_indices_c1.append(i)
common_indices_c2.append(j)
break
# Get changing pitch indices
changing_indices_c1 = [
i for i in range(len(c1_pitches)) if i not in common_indices_c1
]
changing_indices_c2 = [
i for i in range(len(c2_transposed_pitches)) if i not in common_indices_c2
]
# Build base map for common pitches: index -> index
base_map = {}
for i in common_indices_c1:
dest_idx = common_indices_c2[common_indices_c1.index(i)]
base_map[i] = dest_idx
# If no changing pitches, return just the base map
if not changing_indices_c1:
return [base_map]
# For changing pitches, find all valid permutations
# Each changing pitch in c2 must be adjacent to some pitch in c1
c1_changing = [c1_pitches[i] for i in changing_indices_c1]
c2_changing = [c2_transposed_pitches[i] for i in changing_indices_c2]
# Find valid pairings: which c1 pitch can map to which c2 pitch (must be adjacent)
valid_pairings = []
for p1 in c1_changing:
pairings = []
for p2 in c2_changing:
if self._is_adjacent_pitches(p1, p2):
cents = abs(p1.to_cents() - p2.to_cents())
pairings.append((p1, p2, cents))
valid_pairings.append(pairings)
# Generate all permutations and filter valid ones
from itertools import permutations
all_maps = []
num_changing = len(c2_changing)
# For each permutation of c2_changing indices
for perm in permutations(range(num_changing)):
new_map = dict(base_map) # Start with common pitches
valid = True
for i, c1_idx in enumerate(changing_indices_c1):
dest_idx = changing_indices_c2[perm[i]]
new_map[c1_idx] = dest_idx
if valid:
all_maps.append(new_map)
return all_maps
def _calc_symdiff_expanded(self, c1: Chord, c2: Chord) -> int:
"""Calculate symmetric difference on transposed (expanded) pitches.
Uses the transposed pitches directly without collapsing.
"""
set1 = set(c1.pitches)
set2 = set(c2.pitches)
return len(set1.symmetric_difference(set2))
def _check_voice_leading_connectivity(self, c1: Chord, c2: Chord) -> bool:
"""
Check that each pitch that changes is connected (adjacent in lattice)
to some pitch in the previous chord.
Uses transposed pitches directly without collapsing.
"""
# Use pitches directly (transposed form)
c1_pitches = set(c1.pitches)
c2_pitches = set(c2.pitches)
# Find pitches that change
common = c1_pitches & c2_pitches
changing = c2_pitches - c1_pitches
if not changing:
return False # No change = no edge
# For each changing pitch, check if it's adjacent to any pitch in c1
for p2 in changing:
is_adjacent = False
for p1 in c1_pitches:
if self._is_adjacent_pitches(p1, p2):
is_adjacent = True
break
if not is_adjacent:
return False # A changing pitch is not connected
return True
def _is_adjacent_pitches(self, p1: Pitch, p2: Pitch) -> bool:
"""Check if two collapsed pitches are adjacent (differ by ±1 in one dimension).
For collapsed harmonic space, skip dimension 0 (the octave dimension).
"""
diff_count = 0
# Skip dimension 0 (octave) in CHS
for d in range(1, len(self.dims)):
diff = abs(p1[d] - p2[d])
if diff > 1:
return False
if diff == 1:
diff_count += 1
return diff_count == 1
def _check_melodic_threshold(
self,
movements: dict,
threshold_cents: float,
) -> bool:
"""Check if changing pitch movements stay within melodic threshold.
Args:
movements: Dict mapping source pitch -> {destination, cent_difference}
threshold_cents: Maximum allowed movement in cents
Returns:
True if all movements are within threshold.
Common pitches (0 cents) always pass.
Changing pitches must have cent_difference <= threshold.
"""
for src, data in movements.items():
cents = data["cent_difference"]
# Common pitches have 0 cent difference - always pass
# Changing pitches: check if movement is within threshold
if cents > threshold_cents:
return False
return True
def _invert_transposition(self, trans: Pitch) -> Pitch:
"""Invert a transposition."""
return Pitch(tuple(-t for t in trans.hs_array), self.dims)
# ============================================================================
# PATH FINDER
# ============================================================================
class PathFinder:
"""Finds paths through voice leading graphs."""
def __init__(self, graph: nx.MultiDiGraph):
self.graph = graph
def find_stochastic_path(
self,
start_chord: Chord | None = None,
max_length: int = 100,
weights_config: dict | None = None,
) -> list[Chord]:
"""
Find a stochastic path through the graph.
Args:
start_chord: Starting chord (random if None)
max_length: Maximum path length
weights_config: Configuration for edge weighting
Returns:
List of Chord objects representing the path
"""
if weights_config is None:
weights_config = self._default_weights_config()
# Initialize
chords = self._initialize_chords(start_chord)
current = chords[-1][0] if chords else None
if current is None or len(self.graph.nodes()) == 0:
return []
path = [current]
last_graph_nodes = (current,)
# Track cumulative transposition across all steps
# Start with identity (zero transposition)
dims = current.dims
cumulative_trans = Pitch(tuple(0 for _ in range(len(dims))), dims)
# Track voice mapping: voice_map[i] = which original voice is at position i
# Start with identity: voice 0 at pos 0, voice 1 at pos 1, etc.
num_voices = len(current.pitches)
voice_map = list(range(num_voices))
for _ in range(max_length):
# Find edges from original graph node
out_edges = list(self.graph.out_edges(current, data=True))
if not out_edges:
break
# Calculate weights for each edge
weights = self._calculate_edge_weights(
out_edges, path, last_graph_nodes, weights_config
)
# Select edge stochastically
edge = choices(out_edges, weights=weights)[0]
next_node = edge[1]
trans = edge[2].get("transposition")
movement = edge[2].get("movements", {})
# Compose voice mapping with movement map
# movement: src_idx -> dest_idx (voice at src moves to dest)
# voice_map: position i -> original voice
# new_voice_map[dest_idx] = voice_map[src_idx]
new_voice_map = [None] * num_voices
for src_idx, dest_idx in movement.items():
new_voice_map[dest_idx] = voice_map[src_idx]
voice_map = new_voice_map
# Add this edge's transposition to cumulative
if trans is not None:
cumulative_trans = cumulative_trans.transpose(trans)
# Get transposed chord
transposed = next_node.transpose(cumulative_trans)
# Reorder pitches according to voice mapping
# voice_map[i] = which original voice is at position i
reordered_pitches = tuple(
transposed.pitches[voice_map[i]] for i in range(num_voices)
)
sounding_chord = Chord(reordered_pitches, dims)
# Move to next graph node
current = next_node
path.append(sounding_chord)
last_graph_nodes = last_graph_nodes + (current,)
if len(last_graph_nodes) > 2:
last_graph_nodes = last_graph_nodes[-2:]
return path
def _initialize_chords(self, start_chord: Chord | None) -> tuple:
"""Initialize chord sequence."""
if start_chord is not None:
return ((start_chord, start_chord),)
# Random start
nodes = list(self.graph.nodes())
if nodes:
return ((choice(nodes), choice(nodes)),)
return ()
def _default_weights_config(self) -> dict:
"""Default weights configuration."""
return {
"contrary_motion": True,
"direct_tuning": True,
"voice_crossing_allowed": False, # False = reject edges with voice crossing
"melodic_threshold_min": 0,
"melodic_threshold_max": 500,
}
def _calculate_edge_weights(
self,
out_edges: list,
path: list[Chord],
last_chords: tuple[Chord, ...],
config: dict,
) -> list[float]:
"""Calculate weights for edges based on configuration."""
weights = []
# Get melodic threshold settings
melodic_min = config.get("melodic_threshold_min", 0)
melodic_max = config.get("melodic_threshold_max", float("inf"))
for edge in out_edges:
w = 1.0
edge_data = edge[2]
# Read pre-computed edge properties from graph
cent_diffs = edge_data.get("cent_diffs", [])
voice_crossing = edge_data.get("voice_crossing", False)
is_directly_tunable = edge_data.get("is_directly_tunable", False)
# Melodic threshold check: ALL movements must be within min/max range
if melodic_min is not None or melodic_max is not None:
all_within_range = True
for cents in cent_diffs:
if melodic_min is not None and cents < melodic_min:
all_within_range = False
break
if melodic_max is not None and cents > melodic_max:
all_within_range = False
break
if all_within_range:
w *= 10 # Boost for within range
else:
w = 0.0 # Penalty for outside range
if w == 0.0:
weights.append(w)
continue
# Contrary motion weight
if config.get("contrary_motion", False):
if len(cent_diffs) >= 3:
sorted_diffs = sorted(cent_diffs)
if sorted_diffs[0] < 0 and sorted_diffs[-1] > 0:
w *= 100
# Direct tuning weight
if config.get("direct_tuning", False):
if is_directly_tunable:
w *= 10
# Voice crossing check - reject edges where voices cross (if not allowed)
if not config.get("voice_crossing_allowed", False):
if edge_data.get("voice_crossing", False):
w = 0.0 # Reject edges with voice crossing
weights.append(w)
return weights
def is_hamiltonian(self, path: list[Chord]) -> bool:
"""Check if a path is Hamiltonian (visits all nodes exactly once)."""
return len(path) == len(self.graph.nodes()) and len(set(path)) == len(path)
# ============================================================================
# I/O
# ============================================================================
def write_chord_sequence(seq: list[Chord], path: str) -> None:
"""Write a chord sequence to a JSON file."""
import json
# Convert to serializable format
serializable = []
for chord in seq:
chord_data = []
for pitch in chord._pitches:
chord_data.append(
{
"hs_array": list(pitch.hs_array),
"fraction": str(pitch.to_fraction()),
"cents": pitch.to_cents(),
}
)
serializable.append(chord_data)
# Write with formatting
content = json.dumps(serializable, indent=2)
content = content.replace("[[[", "[\n\t[[")
content = content.replace(", [[", ",\n\t[[")
content = content.replace("]]]", "]]\n]")
with open(path, "w") as f:
f.write(content)
def write_chord_sequence_readable(seq: list[Chord], path: str) -> None:
"""Write chord sequence as tuple of hs_arrays - one line per chord."""
with open(path, "w") as f:
f.write("(\n")
for i, chord in enumerate(seq):
arrays = tuple(p.hs_array for p in chord._pitches)
f.write(f" {arrays},\n")
f.write(")\n")
# ============================================================================
# MAIN / DEMO
# ============================================================================
def main():
"""Demo: Generate compact sets and build graph."""
import argparse
parser = argparse.ArgumentParser(
description="Generate chord paths in harmonic space"
)
parser.add_argument(
"--symdiff-min",
type=int,
default=2,
help="Minimum symmetric difference between chords",
)
parser.add_argument(
"--symdiff-max",
type=int,
default=2,
help="Maximum symmetric difference between chords",
)
parser.add_argument(
"--melodic-min",
type=int,
default=0,
help="Minimum cents for any pitch movement (0 = no minimum)",
)
parser.add_argument(
"--melodic-max",
type=int,
default=500,
help="Maximum cents for any pitch movement (0 = no maximum)",
)
parser.add_argument(
"--dims", type=int, default=7, help="Number of prime dimensions (4, 5, 7, or 8)"
)
parser.add_argument("--chord-size", type=int, default=3, help="Size of chords")
parser.add_argument("--max-path", type=int, default=50, help="Maximum path length")
parser.add_argument("--seed", type=int, default=42, help="Random seed")
args = parser.parse_args()
# Select dims based on argument
if args.dims == 4:
dims = DIMS_4
elif args.dims == 5:
dims = DIMS_5
elif args.dims == 7:
dims = DIMS_7
elif args.dims == 8:
dims = DIMS_8
else:
dims = DIMS_7
# Set up harmonic space
space = HarmonicSpace(dims, collapsed=True)
print(f"Space: {space}")
print(f"Symdiff: {args.symdiff_min} to {args.symdiff_max}")
# Generate connected sets
print("Generating connected sets...")
chords = space.generate_connected_sets(
min_size=args.chord_size, max_size=args.chord_size
)
print(f"Found {len(chords)} unique chords")
# Build voice leading graph
print("Building voice leading graph...")
graph = space.build_voice_leading_graph(
chords,
symdiff_min=args.symdiff_min,
symdiff_max=args.symdiff_max,
)
print(f"Graph: {graph.number_of_nodes()} nodes, {graph.number_of_edges()} edges")
# Find stochastic path
print("Finding stochastic path...")
path_finder = PathFinder(graph)
seed(args.seed)
# Set up weights config with melodic thresholds
weights_config = path_finder._default_weights_config()
weights_config["melodic_threshold_min"] = args.melodic_min
weights_config["melodic_threshold_max"] = args.melodic_max
path = path_finder.find_stochastic_path(
max_length=args.max_path, weights_config=weights_config
)
print(f"Path length: {len(path)}")
# Write output
write_chord_sequence(path, "output_chords.json")
print("Written to output_chords.json")
write_chord_sequence_readable(path, "output_chords.txt")
print("Written to output_chords.txt")
if __name__ == "__main__":
main()