Refactor into src/ module with caching and CLI improvements

- Split monolithic compact_sets.py into modular src/ directory
- Add graph caching (pickle + JSON) with --cache-dir option
- Add --output-dir, --rebuild-cache, --no-cache CLI options
- Default seed is now None (random) instead of 42
- Add .gitignore entries for cache/ and output/
This commit is contained in:
Michael Winter 2026-03-13 18:38:38 +01:00
parent dd9df2ad33
commit 0698d01d85
8 changed files with 1287 additions and 1177 deletions

8
.gitignore vendored
View file

@ -1,7 +1,11 @@
__pycache__/ __pycache__/
*.pyc *.pyc
output_*.json
output_*.txt
.venv/ .venv/
venv/ venv/
.pytest_cache/ .pytest_cache/
ruff_cache/
# Generated outputs
cache/
output/

File diff suppressed because it is too large Load diff

23
src/__init__.py Normal file
View file

@ -0,0 +1,23 @@
#!/usr/bin/env python
"""
Compact Sets: A rational theory of harmony
Based on Michael Winter's theory of conjunct connected sets in harmonic space,
combining ideas from Tom Johnson, James Tenney, and Larry Polansky.
"""
from .pitch import Pitch, DIMS_4, DIMS_5, DIMS_7, DIMS_8
from .chord import Chord
from .harmonic_space import HarmonicSpace
from .graph import PathFinder
__all__ = [
"Pitch",
"Chord",
"HarmonicSpace",
"PathFinder",
"DIMS_4",
"DIMS_5",
"DIMS_7",
"DIMS_8",
]

116
src/chord.py Normal file
View file

@ -0,0 +1,116 @@
#!/usr/bin/env python
"""
Chord class - a set of pitches forming a connected subgraph in harmonic space.
A chord is a tuple of Pitches. Two chords are equivalent under
transposition if they have the same intervallic structure.
"""
from __future__ import annotations
from typing import Iterator
from .pitch import Pitch
class Chord:
def __init__(self, pitches: tuple[Pitch, ...], dims: tuple[int, ...] | None = None):
from .pitch import DIMS_7
self.dims = dims if dims is not None else DIMS_7
self._pitches = pitches
def __hash__(self) -> int:
return hash(self._pitches)
def __eq__(self, other: object) -> bool:
if not isinstance(other, Chord):
return NotImplemented
return self._pitches == other._pitches
def __repr__(self) -> str:
return f"Chord({self._pitches})"
def __iter__(self) -> Iterator[Pitch]:
return iter(self._pitches)
def __len__(self) -> int:
return len(self._pitches)
def __getitem__(self, index: int) -> Pitch:
return self._pitches[index]
@property
def pitches(self) -> tuple[Pitch, ...]:
"""Get the pitches as a tuple."""
return self._pitches
@property
def collapsed_pitches(self) -> set[Pitch]:
"""Get all pitches collapsed to pitch class."""
return set(p.collapse() for p in self._pitches)
def is_connected(self) -> bool:
"""
Check if the chord forms a connected subgraph in harmonic space.
A set is connected if every pitch can be reached from every other
by stepping through adjacent pitches (differing by ±1 in one dimension).
"""
if len(self._pitches) <= 1:
return True
adj = {p: set() for p in self._pitches}
for i, p1 in enumerate(self._pitches):
for p2 in self._pitches[i + 1 :]:
if self._is_adjacent(p1, p2):
adj[p1].add(p2)
adj[p2].add(p1)
visited = {self._pitches[0]}
queue = [self._pitches[0]]
while queue:
current = queue.pop(0)
for neighbor in adj[current]:
if neighbor not in visited:
visited.add(neighbor)
queue.append(neighbor)
return len(visited) == len(self._pitches)
def _is_adjacent(self, p1: Pitch, p2: Pitch) -> bool:
"""Check if two pitches are adjacent (differ by ±1 in exactly one dimension).
For collapsed harmonic space, skip dimension 0 (the octave dimension).
"""
diff_count = 0
for d in range(1, len(self.dims)):
diff = abs(p1[d] - p2[d])
if diff > 1:
return False
if diff == 1:
diff_count += 1
return diff_count == 1
def symmetric_difference_size(self, other: Chord) -> int:
"""Calculate the size of symmetric difference between two chords."""
set1 = set(p.collapse() for p in self._pitches)
set2 = set(p.collapse() for p in other._pitches)
return len(set1.symmetric_difference(set2))
def size_difference(self, other: Chord) -> int:
"""Calculate the absolute difference in chord sizes."""
return abs(len(self._pitches) - len(other._pitches))
def project_all(self) -> list[Pitch]:
"""Project all pitches to [1, 2) range."""
return [p.project() for p in self._pitches]
def transpose(self, trans: Pitch) -> Chord:
"""Transpose the entire chord."""
return Chord(tuple(p.transpose(trans) for p in self._pitches), self.dims)
def sorted_by_frequency(self) -> list[Pitch]:
"""Sort pitches by frequency (low to high)."""
return sorted(self._pitches, key=lambda p: p.to_fraction())

234
src/graph.py Normal file
View file

@ -0,0 +1,234 @@
#!/usr/bin/env python
"""
PathFinder - finds paths through voice leading graphs.
"""
from __future__ import annotations
import networkx as nx
from random import choices, seed
from typing import Iterator
class PathFinder:
"""Finds paths through voice leading graphs."""
def __init__(self, graph: nx.MultiDiGraph):
self.graph = graph
def find_stochastic_path(
self,
start_chord: "Chord | None" = None,
max_length: int = 100,
weights_config: dict | None = None,
) -> list["Chord"]:
"""Find a stochastic path through the graph."""
if weights_config is None:
weights_config = self._default_weights_config()
chord = self._initialize_chords(start_chord)
if not chord or chord[0] is None or len(self.graph.nodes()) == 0:
return []
original_chord = chord[0]
graph_node = original_chord
output_chord = original_chord
path = [output_chord]
last_graph_nodes = (graph_node,)
graph_path = [graph_node]
from .pitch import Pitch
dims = output_chord.dims
cumulative_trans = Pitch(tuple(0 for _ in range(len(dims))), dims)
num_voices = len(output_chord.pitches)
voice_map = list(range(num_voices))
voice_stay_count = [0] * num_voices
for _ in range(max_length):
out_edges = list(self.graph.out_edges(graph_node, data=True))
if not out_edges:
break
weights = self._calculate_edge_weights(
out_edges,
path,
last_graph_nodes,
weights_config,
tuple(voice_stay_count),
graph_path,
)
edge = choices(out_edges, weights=weights)[0]
next_graph_node = edge[1]
trans = edge[2].get("transposition")
movement = edge[2].get("movements", {})
for src_idx, dest_idx in movement.items():
if src_idx == dest_idx:
voice_stay_count[src_idx] += 1
else:
voice_stay_count[src_idx] = 0
new_voice_map = [None] * num_voices
for src_idx, dest_idx in movement.items():
new_voice_map[dest_idx] = voice_map[src_idx]
voice_map = new_voice_map
if trans is not None:
cumulative_trans = cumulative_trans.transpose(trans)
transposed = next_graph_node.transpose(cumulative_trans)
reordered_pitches = tuple(
transposed.pitches[voice_map[i]] for i in range(num_voices)
)
from .chord import Chord
output_chord = Chord(reordered_pitches, dims)
graph_node = next_graph_node
graph_path.append(graph_node)
path.append(output_chord)
last_graph_nodes = last_graph_nodes + (graph_node,)
if len(last_graph_nodes) > 2:
last_graph_nodes = last_graph_nodes[-2:]
return path
def _initialize_chords(self, start_chord: "Chord | None") -> tuple:
"""Initialize chord sequence."""
if start_chord is not None:
return (start_chord,)
nodes = list(self.graph.nodes())
if nodes:
import random
random.shuffle(nodes)
weights_config = self._default_weights_config()
weights_config["voice_crossing_allowed"] = False
for chord in nodes[:50]:
out_edges = list(self.graph.out_edges(chord, data=True))
if len(out_edges) == 0:
continue
weights = self._calculate_edge_weights(
out_edges, [chord], (chord,), weights_config, None
)
nonzero = sum(1 for w in weights if w > 0)
if nonzero > 0:
return (chord,)
return (nodes[0],)
return (None,)
def _default_weights_config(self) -> dict:
"""Default weights configuration."""
return {
"contrary_motion": True,
"direct_tuning": True,
"voice_crossing_allowed": False,
"melodic_threshold_min": 0,
"melodic_threshold_max": 500,
"hamiltonian": True,
"dca": 2.0,
}
def _calculate_edge_weights(
self,
out_edges: list,
path: list["Chord"],
last_chords: tuple["Chord", ...],
config: dict,
voice_stay_count: tuple[int, ...] | None = None,
graph_path: list["Chord"] | None = None,
) -> list[float]:
"""Calculate weights for edges based on configuration."""
weights = []
dca_multiplier = config.get("dca", 0)
if dca_multiplier is None:
dca_multiplier = 0
melodic_min = config.get("melodic_threshold_min", 0)
melodic_max = config.get("melodic_threshold_max", float("inf"))
for edge in out_edges:
w = 1.0
edge_data = edge[2]
cent_diffs = edge_data.get("cent_diffs", [])
voice_crossing = edge_data.get("voice_crossing", False)
is_directly_tunable = edge_data.get("is_directly_tunable", False)
if melodic_min is not None or melodic_max is not None:
all_within_range = True
for cents in cent_diffs:
if melodic_min is not None and cents < melodic_min:
all_within_range = False
break
if melodic_max is not None and cents > melodic_max:
all_within_range = False
break
if all_within_range:
w *= 10
else:
w = 0.0
if w == 0.0:
weights.append(w)
continue
if config.get("contrary_motion", False):
if len(cent_diffs) >= 3:
sorted_diffs = sorted(cent_diffs)
if sorted_diffs[0] < 0 and sorted_diffs[-1] > 0:
w *= 100
if config.get("direct_tuning", False):
if is_directly_tunable:
w *= 10
if not config.get("voice_crossing_allowed", False):
if edge_data.get("voice_crossing", False):
w = 0.0
if config.get("hamiltonian", False):
destination = edge[1]
if graph_path and destination in graph_path:
w *= 0.1
else:
w *= 10
if dca_multiplier > 0 and voice_stay_count is not None and len(path) > 0:
source_chord = path[-1]
movements = edge_data.get("movements", {})
move_boost = 1.0
for voice_idx in range(len(voice_stay_count)):
if voice_idx in movements:
dest_idx = movements[voice_idx]
if dest_idx != voice_idx:
stay_count = voice_stay_count[voice_idx]
move_boost *= dca_multiplier**stay_count
w *= move_boost
weights.append(w)
return weights
def is_hamiltonian(self, path: list["Chord"]) -> bool:
"""Check if a path is Hamiltonian (visits all nodes exactly once)."""
return len(path) == len(self.graph.nodes()) and len(set(path)) == len(path)

396
src/harmonic_space.py Normal file
View file

@ -0,0 +1,396 @@
#!/usr/bin/env python
"""
Harmonic space - multidimensional lattice where dimensions = prime factors.
HS_l = harmonic space with first l primes
CHS_l = collapsed harmonic space
"""
from __future__ import annotations
from typing import Iterator
import networkx as nx
from .pitch import Pitch, DIMS_7
from .chord import Chord
class HarmonicSpace:
"""
Harmonic space HS_l or collapsed harmonic space CHS_l.
A multidimensional lattice where each dimension corresponds to a prime factor.
"""
def __init__(self, dims: tuple[int, ...] = DIMS_7, collapsed: bool = True):
self.dims = dims
self.collapsed = collapsed
def __repr__(self) -> str:
suffix = " (collapsed)" if self.collapsed else ""
return f"HarmonicSpace({self.dims}{suffix})"
def pitch(self, hs_array: tuple[int, ...]) -> Pitch:
"""Create a Pitch in this space."""
return Pitch(hs_array, self.dims)
def chord(self, pitches: tuple[Pitch, ...]) -> Chord:
"""Create a Chord in this space."""
return Chord(pitches, self.dims)
def root(self) -> Pitch:
"""Get the root pitch (1/1)."""
return self.pitch(tuple(0 for _ in self.dims))
def _branch_from(self, vertex: tuple[int, ...]) -> set[tuple[int, ...]]:
"""
Get all vertices adjacent to the given vertex.
For collapsed harmonic space, skip dimension 0 (the octave dimension).
"""
branches = set()
start_dim = 1 if self.collapsed else 0
for i in range(start_dim, len(self.dims)):
for delta in (-1, 1):
branch = list(vertex)
branch[i] += delta
branches.add(tuple(branch))
return branches
def generate_connected_sets(
self, min_size: int, max_size: int, collapsed: bool = True
) -> set[Chord]:
"""
Generate all unique connected sets of a given size.
Args:
min_size: Minimum number of pitches in a chord
max_size: Maximum number of pitches in a chord
collapsed: If True, use CHS (skip dim 0 in branching).
If False (default), include dim 0 in branching.
Returns:
Set of unique Chord objects
"""
root = tuple(0 for _ in self.dims)
def branch_from(vertex):
"""Get adjacent vertices. Skip dim 0 for CHS."""
branches = set()
start_dim = 1 if collapsed else 0
for i in range(start_dim, len(self.dims)):
for delta in (-1, 1):
branch = list(vertex)
branch[i] += delta
branches.add(tuple(branch))
return branches
def grow(
chord: tuple[tuple[int, ...], ...],
connected: set[tuple[int, ...]],
visited: set[tuple[int, ...]],
) -> Iterator[tuple[tuple[int, ...], ...]]:
"""Recursively grow connected sets."""
if min_size <= len(chord) <= max_size:
if collapsed:
projected = []
for arr in chord:
p = self.pitch(arr)
projected.append(p.project().hs_array)
yield tuple(projected)
else:
yield chord
if len(chord) < max_size:
visited = set(visited)
for b in connected:
if b not in visited:
extended = chord + (b,)
new_connected = connected | branch_from(b)
visited.add(b)
yield from grow(extended, new_connected, visited)
connected = branch_from(root)
visited = {root}
results = set()
for chord_arrays in grow((root,), connected, visited):
pitches = tuple(self.pitch(arr) for arr in chord_arrays)
sorted_pitches = tuple(sorted(pitches, key=lambda p: p.to_fraction()))
results.add(Chord(sorted_pitches, self.dims))
return results
def build_voice_leading_graph(
self,
chords: set[Chord],
symdiff_min: int = 2,
symdiff_max: int = 2,
) -> nx.MultiDiGraph:
"""
Build a voice leading graph from a set of chords.
Args:
chords: Set of Chord objects
symdiff_min: Minimum symmetric difference between chords
symdiff_max: Maximum symmetric difference between chords
Returns:
NetworkX MultiDiGraph
"""
from itertools import combinations
symdiff_range = (symdiff_min, symdiff_max)
graph = nx.MultiDiGraph()
for chord in chords:
graph.add_node(chord)
for c1, c2 in combinations(chords, 2):
edges = self._find_valid_edges(c1, c2, symdiff_range)
for edge_data in edges:
(
trans,
weight,
movements,
cent_diffs,
voice_crossing,
is_directly_tunable,
) = edge_data
graph.add_edge(
c1,
c2,
transposition=trans,
weight=weight,
movements=movements,
cent_diffs=cent_diffs,
voice_crossing=voice_crossing,
is_directly_tunable=is_directly_tunable,
)
graph.add_edge(
c2,
c1,
transposition=self._invert_transposition(trans),
weight=weight,
movements=self._reverse_movements(movements),
cent_diffs=list(reversed(cent_diffs)),
voice_crossing=voice_crossing,
is_directly_tunable=is_directly_tunable,
)
return graph
def _reverse_movements(self, movements: dict) -> dict:
"""Reverse the movement mappings (index to index)."""
reversed_movements = {}
for src_idx, dest_idx in movements.items():
reversed_movements[dest_idx] = src_idx
return reversed_movements
def _is_directly_tunable(
self,
c1_pitches: tuple[Pitch, ...],
c2_transposed_pitches: tuple[Pitch, ...],
movements: dict,
) -> bool:
"""Check if all changing pitches are adjacent (directly tunable) to a staying pitch."""
staying_indices = [i for i in range(len(c1_pitches)) if movements.get(i) == i]
if not staying_indices:
return False
changing_indices = [
i for i in range(len(c1_pitches)) if i not in staying_indices
]
if not changing_indices:
return True
for ch_idx in changing_indices:
ch_pitch = c2_transposed_pitches[ch_idx]
is_adjacent_to_staying = False
for st_idx in staying_indices:
st_pitch = c1_pitches[st_idx]
if self._is_adjacent_pitches(st_pitch, ch_pitch):
is_adjacent_to_staying = True
break
if not is_adjacent_to_staying:
return False
return True
def _find_valid_edges(
self,
c1: Chord,
c2: Chord,
symdiff_range: tuple[int, int],
) -> list[tuple[Pitch, float, dict, list[float], bool, bool]]:
"""Find all valid edges between two chords."""
from itertools import combinations as iter_combinations
edges = []
transpositions = {
p1.pitch_difference(p2) for p1 in c1.pitches for p2 in c2.pitches
}
for trans in transpositions:
c2_transposed = c2.transpose(trans)
symdiff = self._calc_symdiff(c1, c2_transposed)
if not (symdiff_range[0] <= symdiff <= symdiff_range[1]):
continue
voice_lead_ok = self._check_voice_leading_connectivity(c1, c2_transposed)
if not voice_lead_ok:
continue
movement_maps = self._build_movement_maps(c1.pitches, c2_transposed.pitches)
for movements in movement_maps:
cent_diffs = []
for src_idx, dest_idx in movements.items():
src_pitch = c1.pitches[src_idx]
dst_pitch = c2_transposed.pitches[dest_idx]
cents = abs(src_pitch.to_cents() - dst_pitch.to_cents())
cent_diffs.append(cents)
source = list(c1.pitches)
destination = [None] * len(source)
for src_idx, dest_idx in movements.items():
destination[dest_idx] = c2_transposed.pitches[src_idx]
voice_crossing = False
for i in range(len(destination) - 1):
if destination[i] is None or destination[i + 1] is None:
continue
if destination[i].to_fraction() >= destination[i + 1].to_fraction():
voice_crossing = True
break
is_directly_tunable = self._is_directly_tunable(
c1.pitches, c2_transposed.pitches, movements
)
edges.append(
(
trans,
1.0,
movements,
cent_diffs,
voice_crossing,
is_directly_tunable,
)
)
return edges
def _build_movement_maps(
self, c1_pitches: tuple[Pitch, ...], c2_transposed_pitches: tuple[Pitch, ...]
) -> list[dict]:
"""Build all valid movement maps for c1 -> c2_transposed."""
from itertools import permutations
c1_collapsed = [p.collapse() for p in c1_pitches]
c2_collapsed = [p.collapse() for p in c2_transposed_pitches]
common_indices_c1 = []
common_indices_c2 = []
for i, pc1 in enumerate(c1_collapsed):
for j, pc2 in enumerate(c2_collapsed):
if pc1 == pc2:
common_indices_c1.append(i)
common_indices_c2.append(j)
break
changing_indices_c1 = [
i for i in range(len(c1_pitches)) if i not in common_indices_c1
]
changing_indices_c2 = [
i for i in range(len(c2_transposed_pitches)) if i not in common_indices_c2
]
base_map = {}
for i in common_indices_c1:
dest_idx = common_indices_c2[common_indices_c1.index(i)]
base_map[i] = dest_idx
if not changing_indices_c1:
return [base_map]
c1_changing = [c1_pitches[i] for i in changing_indices_c1]
c2_changing = [c2_transposed_pitches[i] for i in changing_indices_c2]
valid_pairings = []
for p1 in c1_changing:
pairings = []
for p2 in c2_changing:
if self._is_adjacent_pitches(p1, p2):
cents = abs(p1.to_cents() - p2.to_cents())
pairings.append((p1, p2, cents))
valid_pairings.append(pairings)
all_maps = []
num_changing = len(c2_changing)
for perm in permutations(range(num_changing)):
new_map = dict(base_map)
valid = True
for i, c1_idx in enumerate(changing_indices_c1):
dest_idx = changing_indices_c2[perm[i]]
new_map[c1_idx] = dest_idx
if valid:
all_maps.append(new_map)
return all_maps
def _calc_symdiff(self, c1: Chord, c2: Chord) -> int:
"""Calculate symmetric difference between two chords."""
set1 = set(c1.pitches)
set2 = set(c2.pitches)
return len(set1.symmetric_difference(set2))
def _check_voice_leading_connectivity(self, c1: Chord, c2: Chord) -> bool:
"""Check that each pitch that changes is connected (adjacent in lattice) to some pitch in the previous chord."""
c1_pitches = set(c1.pitches)
c2_pitches = set(c2.pitches)
common = c1_pitches & c2_pitches
changing = c2_pitches - c1_pitches
if not changing:
return False
for p2 in changing:
is_adjacent = False
for p1 in c1_pitches:
if self._is_adjacent_pitches(p1, p2):
is_adjacent = True
break
if not is_adjacent:
return False
return True
def _is_adjacent_pitches(self, p1: Pitch, p2: Pitch) -> bool:
"""Check if two collapsed pitches are adjacent (differ by ±1 in one dimension)."""
diff_count = 0
for d in range(1, len(self.dims)):
diff = abs(p1[d] - p2[d])
if diff > 1:
return False
if diff == 1:
diff_count += 1
return diff_count == 1
def _invert_transposition(self, trans: Pitch) -> Pitch:
"""Invert a transposition."""
return Pitch(tuple(-t for t in trans.hs_array), self.dims)

419
src/io.py Normal file
View file

@ -0,0 +1,419 @@
#!/usr/bin/env python
"""
I/O functions and CLI main entry point.
"""
import json
from fractions import Fraction
from pathlib import Path
from random import seed
def write_chord_sequence(seq: list["Chord"], path: str) -> None:
"""Write a chord sequence to a JSON file."""
serializable = []
for chord in seq:
chord_data = []
for pitch in chord._pitches:
chord_data.append(
{
"hs_array": list(pitch.hs_array),
"fraction": str(pitch.to_fraction()),
"cents": pitch.to_cents(),
}
)
serializable.append(chord_data)
content = json.dumps(serializable, indent=2)
content = content.replace("[[[", "[\n\t[[")
content = content.replace(", [[", ",\n\t[[")
content = content.replace("]]]", "]]\n]")
with open(path, "w") as f:
f.write(content)
def write_chord_sequence_readable(seq: list["Chord"], path: str) -> None:
"""Write chord sequence as tuple of hs_arrays - one line per chord."""
with open(path, "w") as f:
f.write("(\n")
for i, chord in enumerate(seq):
arrays = tuple(p.hs_array for p in chord._pitches)
f.write(f" {arrays},\n")
f.write(")\n")
def write_chord_sequence_frequencies(
seq: list["Chord"], path: str, fundamental: float = 100.0
) -> None:
"""Write chord sequence as frequencies in Hz - one line per chord."""
with open(path, "w") as f:
f.write("(\n")
for chord in seq:
freqs = tuple(fundamental * float(p.to_fraction()) for p in chord._pitches)
f.write(f" {freqs},\n")
f.write(")\n")
def graph_to_dict(graph: "nx.MultiDiGraph") -> dict:
"""Serialize graph to a dict for JSON."""
from .pitch import Pitch
from .chord import Chord
nodes = []
node_to_idx = {}
for idx, chord in enumerate(graph.nodes()):
nodes.append(
{
"pitches": [list(p.hs_array) for p in chord.pitches],
"dims": list(chord.dims),
}
)
node_to_idx[id(chord)] = idx
edges = []
for u, v, data in graph.edges(data=True):
edges.append(
{
"src_idx": node_to_idx[id(u)],
"dst_idx": node_to_idx[id(v)],
"transposition": list(
data.get(
"transposition", Pitch(tuple([0] * len(u.dims)), u.dims)
).hs_array
),
"weight": data.get("weight", 1.0),
"movements": {str(k): v for k, v in data.get("movements", {}).items()},
"cent_diffs": data.get("cent_diffs", []),
"voice_crossing": data.get("voice_crossing", False),
"is_directly_tunable": data.get("is_directly_tunable", False),
}
)
return {
"nodes": nodes,
"edges": edges,
}
def graph_from_dict(data: dict) -> "nx.MultiDiGraph":
"""Deserialize graph from dict."""
import networkx as nx
from .pitch import Pitch
from .chord import Chord
nodes = []
for node_data in data["nodes"]:
pitches = tuple(
Pitch(tuple(arr), tuple(node_data["dims"])) for arr in node_data["pitches"]
)
nodes.append(Chord(pitches, tuple(node_data["dims"])))
graph = nx.MultiDiGraph()
for node in nodes:
graph.add_node(node)
for edge_data in data["edges"]:
u = nodes[edge_data["src_idx"]]
v = nodes[edge_data["dst_idx"]]
trans = Pitch(tuple(edge_data["transposition"]), u.dims)
movements = {int(k): v for k, v in edge_data["movements"].items()}
graph.add_edge(
u,
v,
transposition=trans,
weight=edge_data.get("weight", 1.0),
movements=movements,
cent_diffs=edge_data.get("cent_diffs", []),
voice_crossing=edge_data.get("voice_crossing", False),
is_directly_tunable=edge_data.get("is_directly_tunable", False),
)
return graph
def save_graph_pickle(graph: "nx.MultiDiGraph", path: str) -> None:
"""Save graph to pickle file."""
import pickle
with open(path, "wb") as f:
pickle.dump(graph, f)
def load_graph_pickle(path: str) -> "nx.MultiDiGraph":
"""Load graph from pickle file."""
import pickle
with open(path, "rb") as f:
return pickle.load(f)
def save_graph_json(graph: "nx.MultiDiGraph", path: str) -> None:
"""Save graph to JSON file."""
data = graph_to_dict(graph)
with open(path, "w") as f:
json.dump(data, f, indent=2)
def load_graph_json(path: str) -> "nx.MultiDiGraph":
"""Load graph from JSON file."""
import json
with open(path, "r") as f:
data = json.load(f)
return graph_from_dict(data)
def get_cache_key(
dims: int, chord_size: int, symdiff_min: int, symdiff_max: int
) -> str:
"""Generate cache key from parameters."""
return f"d{dims}_n{size}_s{min}-{max}".replace("{size}", str(chord_size))
def load_graph_from_cache(
cache_dir: str,
dims: int,
chord_size: int,
symdiff_min: int,
symdiff_max: int,
) -> tuple["nx.MultiDiGraph | None", bool]:
"""
Try to load graph from cache.
Returns:
(graph, was_cached): graph if found, False if not found
"""
cache_key = f"d{dims}_n{chord_size}_s{symdiff_min}-{symdiff_max}"
pkl_path = Path(cache_dir) / f"{cache_key}.pkl"
json_path = Path(cache_dir) / f"{cache_key}.json"
# Try pickle first (faster)
if pkl_path.exists():
try:
graph = load_graph_pickle(str(pkl_path))
return graph, True
except Exception as e:
print(f"Warning: Failed to load pickle cache: {e}")
# Try JSON
if json_path.exists():
try:
graph = load_graph_json(str(json_path))
return graph, True
except Exception as e:
print(f"Warning: Failed to load JSON cache: {e}")
return None, False
def save_graph_to_cache(
graph: "nx.MultiDiGraph",
cache_dir: str,
dims: int,
chord_size: int,
symdiff_min: int,
symdiff_max: int,
) -> None:
"""Save graph to cache in both pickle and JSON formats."""
import os
cache_key = f"d{dims}_n{chord_size}_s{symdiff_min}-{symdiff_max}"
pkl_path = Path(cache_dir) / f"{cache_key}.pkl"
json_path = Path(cache_dir) / f"{cache_key}.json"
os.makedirs(cache_dir, exist_ok=True)
# Save both formats
try:
save_graph_pickle(graph, str(pkl_path))
print(f"Cached to {pkl_path}")
except Exception as e:
print(f"Warning: Failed to save pickle: {e}")
try:
save_graph_json(graph, str(json_path))
print(f"Cached to {json_path}")
except Exception as e:
print(f"Warning: Failed to save JSON: {e}")
def main():
"""Demo: Generate compact sets and build graph."""
import argparse
from .pitch import DIMS_4, DIMS_5, DIMS_7, DIMS_8
from .harmonic_space import HarmonicSpace
from .graph import PathFinder
parser = argparse.ArgumentParser(
description="Generate chord paths in harmonic space"
)
parser.add_argument(
"--symdiff-min",
type=int,
default=2,
help="Minimum symmetric difference between chords",
)
parser.add_argument(
"--symdiff-max",
type=int,
default=2,
help="Maximum symmetric difference between chords",
)
parser.add_argument(
"--melodic-min",
type=int,
default=0,
help="Minimum cents for any pitch movement (0 = no minimum)",
)
parser.add_argument(
"--melodic-max",
type=int,
default=500,
help="Maximum cents for any pitch movement (0 = no maximum)",
)
parser.add_argument(
"--dca",
type=float,
default=2.0,
help="DCA (Dissonant Counterpoint Algorithm) multiplier for voice momentum (0 to disable)",
)
parser.add_argument(
"--allow-voice-crossing",
action="store_true",
help="Allow edges where voices cross (default: reject)",
)
parser.add_argument(
"--dims", type=int, default=7, help="Number of prime dimensions (4, 5, 7, or 8)"
)
parser.add_argument("--chord-size", type=int, default=3, help="Size of chords")
parser.add_argument("--max-path", type=int, default=50, help="Maximum path length")
parser.add_argument(
"--seed", type=int, default=None, help="Random seed (default: random)"
)
parser.add_argument(
"--cache-dir",
type=str,
default="./cache",
help="Cache directory for graphs",
)
parser.add_argument(
"--rebuild-cache",
action="store_true",
help="Force rebuild graph (ignore cache)",
)
parser.add_argument(
"--no-cache",
action="store_true",
help="Disable caching",
)
parser.add_argument(
"--output-dir",
type=str,
default="output",
help="Output directory for generated files",
)
args = parser.parse_args()
# Select dims
if args.dims == 4:
dims = DIMS_4
elif args.dims == 5:
dims = DIMS_5
elif args.dims == 7:
dims = DIMS_7
elif args.dims == 8:
dims = DIMS_8
else:
dims = DIMS_7
space = HarmonicSpace(dims, collapsed=True)
print(f"Space: {space}")
print(f"Symdiff: {args.symdiff_min} to {args.symdiff_max}")
# Try to load from cache
graph = None
was_cached = False
if not args.no_cache and not args.rebuild_cache:
graph, was_cached = load_graph_from_cache(
args.cache_dir,
args.dims,
args.chord_size,
args.symdiff_min,
args.symdiff_max,
)
if was_cached:
print(f"Loaded graph from cache")
print(
f"Graph: {graph.number_of_nodes()} nodes, {graph.number_of_edges()} edges"
)
# Build graph if not loaded from cache
if graph is None:
print("Generating connected sets...")
chords = space.generate_connected_sets(
min_size=args.chord_size, max_size=args.chord_size
)
print(f"Found {len(chords)} unique chords")
print("Building voice leading graph...")
graph = space.build_voice_leading_graph(
chords,
symdiff_min=args.symdiff_min,
symdiff_max=args.symdiff_max,
)
print(
f"Graph: {graph.number_of_nodes()} nodes, {graph.number_of_edges()} edges"
)
# Save to cache
if not args.no_cache:
save_graph_to_cache(
graph,
args.cache_dir,
args.dims,
args.chord_size,
args.symdiff_min,
args.symdiff_max,
)
# Find stochastic path
print("Finding stochastic path...")
path_finder = PathFinder(graph)
if args.seed is not None:
seed(args.seed)
weights_config = path_finder._default_weights_config()
weights_config["melodic_threshold_min"] = args.melodic_min
weights_config["melodic_threshold_max"] = args.melodic_max
weights_config["dca"] = args.dca
weights_config["voice_crossing_allowed"] = args.allow_voice_crossing
path = path_finder.find_stochastic_path(
max_length=args.max_path, weights_config=weights_config
)
print(f"Path length: {len(path)}")
# Create output directory and write files
import os
os.makedirs(args.output_dir, exist_ok=True)
write_chord_sequence(path, os.path.join(args.output_dir, "output_chords.json"))
print(f"Written to {args.output_dir}/output_chords.json")
write_chord_sequence_readable(
path, os.path.join(args.output_dir, "output_chords.txt")
)
print(f"Written to {args.output_dir}/output_chords.txt")
write_chord_sequence_frequencies(
path, os.path.join(args.output_dir, "output_frequencies.txt")
)
print(f"Written to {args.output_dir}/output_frequencies.txt")
if __name__ == "__main__":
main()

92
src/pitch.py Normal file
View file

@ -0,0 +1,92 @@
#!/usr/bin/env python
"""
Pitch class - a point in harmonic space.
Represented as an array of exponents on prime dimensions.
Example: (0, 1, 0, 0) represents 3/2 (perfect fifth) in CHS_7
"""
from __future__ import annotations
from fractions import Fraction
from math import log
from operator import add
from typing import Iterator
DIMS_8 = (2, 3, 5, 7, 11, 13, 17, 19)
DIMS_7 = (2, 3, 5, 7, 11, 13, 17)
DIMS_5 = (2, 3, 5, 7, 11)
DIMS_4 = (2, 3, 5, 7)
class Pitch:
def __init__(self, hs_array: tuple[int, ...], dims: tuple[int, ...] | None = None):
self.hs_array = hs_array
self.dims = dims if dims is not None else DIMS_7
def __hash__(self) -> int:
return hash(self.hs_array)
def __eq__(self, other: object) -> bool:
if not isinstance(other, Pitch):
return NotImplemented
return self.hs_array == other.hs_array
def __repr__(self) -> str:
return f"Pitch({self.hs_array})"
def __iter__(self):
return iter(self.hs_array)
def __len__(self) -> int:
return len(self.hs_array)
def __getitem__(self, index: int) -> int:
return self.hs_array[index]
def to_fraction(self) -> Fraction:
"""Convert to frequency ratio (e.g., 3/2)."""
from math import prod
return Fraction(
prod(pow(self.dims[d], self.hs_array[d]) for d in range(len(self.dims)))
)
def to_cents(self) -> float:
"""Convert to cents (relative to 1/1 = 0 cents)."""
fr = self.to_fraction()
return 1200 * log(float(fr), 2)
def collapse(self) -> Pitch:
"""
Collapse pitch so frequency ratio is in [1, 2).
This removes octave information, useful for pitch classes.
"""
collapsed = list(self.hs_array)
fr = self.to_fraction()
if fr < 1:
while fr < 1:
fr *= 2
collapsed[0] += 1
elif fr >= 2:
while fr >= 2:
fr /= 2
collapsed[0] -= 1
return Pitch(tuple(collapsed), self.dims)
def project(self) -> Pitch:
"""Project pitch to [1, 2) range - same as collapse."""
return self.collapse()
def transpose(self, trans: Pitch) -> Pitch:
"""Transpose by another pitch (add exponents element-wise)."""
return Pitch(tuple(map(add, self.hs_array, trans.hs_array)), self.dims)
def pitch_difference(self, other: Pitch) -> Pitch:
"""Calculate the pitch difference (self - other)."""
return Pitch(
tuple(self.hs_array[d] - other.hs_array[d] for d in range(len(self.dims))),
self.dims,
)