compact_sets/src/io.py

688 lines
21 KiB
Python
Raw Normal View History

#!/usr/bin/env python
"""
I/O functions and CLI main entry point.
"""
import json
from fractions import Fraction
from pathlib import Path
from random import seed
def write_chord_sequence(seq: list["Chord"], path: str) -> None:
"""Write a chord sequence to a JSON file."""
lines = ["["]
for chord_idx, chord in enumerate(seq):
lines.append(" [")
for pitch_idx, pitch in enumerate(chord._pitches):
pitch_obj = {
"hs_array": list(pitch.hs_array),
"fraction": str(pitch.to_fraction()),
"cents": pitch.to_cents(),
}
pitch_json = json.dumps(pitch_obj)
comma = "," if pitch_idx < len(chord._pitches) - 1 else ""
lines.append(f" {pitch_json}{comma}")
chord_bracket = " ]" if chord_idx == len(seq) - 1 else " ],"
lines.append(chord_bracket)
lines.append("]")
with open(path, "w") as f:
f.write("\n".join(lines))
def write_chord_sequence_readable(seq: list["Chord"], path: str) -> None:
"""Write chord sequence as tuple of hs_arrays - one line per chord."""
with open(path, "w") as f:
f.write("(\n")
for i, chord in enumerate(seq):
arrays = tuple(p.hs_array for p in chord._pitches)
f.write(f" {arrays},\n")
f.write(")\n")
def write_chord_sequence_frequencies(
seq: list["Chord"], path: str, fundamental: float = 100.0
) -> None:
"""Write chord sequence as frequencies in Hz - one line per chord."""
with open(path, "w") as f:
f.write("(\n")
for chord in seq:
freqs = tuple(fundamental * float(p.to_fraction()) for p in chord._pitches)
f.write(f" {freqs},\n")
f.write(")\n")
def _serialize_edge_data(edge_data: dict) -> dict:
"""Serialize edge data for JSON, converting Pitch objects to lists."""
if not edge_data:
return {}
result = {}
for key, value in edge_data.items():
if hasattr(value, "hs_array"):
result[key] = list(value.hs_array)
elif isinstance(value, list) and value and hasattr(value[0], "hs_array"):
result[key] = [list(p.hs_array) for p in value]
else:
result[key] = value
return result
def _format_chord_line(pitches: list) -> str:
"""Format a chord (list of pitch hs_arrays) as a single line."""
return json.dumps([list(p) if hasattr(p, "hs_array") else p for p in pitches])
def _format_edge_data_compact(edge_data: dict) -> dict:
"""Format edge data for compact JSON output."""
result = {}
for key, value in edge_data.items():
if hasattr(value, "hs_array"):
result[key] = list(value.hs_array)
elif isinstance(value, list) and value and hasattr(value[0], "hs_array"):
result[key] = [list(p.hs_array) for p in value]
else:
result[key] = value
return result
def write_path_steps(path: "Path", output_path: str) -> None:
"""Write path with all step data to JSON."""
lines = ["["]
for step_idx, step in enumerate(path.steps):
lines.append(" {")
lines.append(f' "step": {step_idx},')
lines.append(
f' "source_chord": {_format_chord_line(step.source_chord.pitches)},'
)
lines.append(
f' "destination_chord": {_format_chord_line(step.destination_chord.pitches)},'
)
if step.transposition:
trans = list(step.transposition.hs_array)
lines.append(f' "transposition": {json.dumps(trans)},')
else:
lines.append(' "transposition": null,')
lines.append(
f' "movements": {json.dumps({str(k): v for k, v in step.movements.items()})},'
)
if step.scores:
lines.append(f' "scores": {json.dumps(step.scores)},')
if step.normalized_scores:
lines.append(
f' "normalized_scores": {json.dumps(step.normalized_scores)},'
)
if step.weight is not None:
lines.append(f' "weight": {step.weight},')
edge_data = _format_edge_data_compact(step.edge_data)
lines.append(f' "edge_data": {json.dumps(edge_data)}')
lines.append(" }" + ("," if step_idx < len(path.steps) - 1 else ""))
lines.append("]")
with open(output_path, "w") as f:
f.write("\n".join(lines))
def graph_to_dict(graph: "nx.MultiDiGraph") -> dict:
"""Serialize graph to a dict for JSON."""
from .pitch import Pitch
from .chord import Chord
nodes = []
node_to_idx = {}
for idx, chord in enumerate(graph.nodes()):
nodes.append(
{
"pitches": [list(p.hs_array) for p in chord.pitches],
"dims": list(chord.dims),
}
)
node_to_idx[id(chord)] = idx
edges = []
for u, v, data in graph.edges(data=True):
edges.append(
{
"src_idx": node_to_idx[id(u)],
"dst_idx": node_to_idx[id(v)],
"transposition": list(
data.get(
"transposition", Pitch(tuple([0] * len(u.dims)), u.dims)
).hs_array
),
"weight": data.get("weight", 1.0),
"movements": {str(k): v for k, v in data.get("movements", {}).items()},
"cent_diffs": data.get("cent_diffs", []),
"voice_crossing": data.get("voice_crossing", False),
"is_directly_tunable": data.get("is_directly_tunable", False),
}
)
return {
"nodes": nodes,
"edges": edges,
}
def graph_from_dict(data: dict) -> "nx.MultiDiGraph":
"""Deserialize graph from dict."""
import networkx as nx
from .pitch import Pitch
from .chord import Chord
nodes = []
for node_data in data["nodes"]:
pitches = tuple(
Pitch(tuple(arr), tuple(node_data["dims"])) for arr in node_data["pitches"]
)
nodes.append(Chord(pitches, tuple(node_data["dims"])))
graph = nx.MultiDiGraph()
for node in nodes:
graph.add_node(node)
for edge_data in data["edges"]:
u = nodes[edge_data["src_idx"]]
v = nodes[edge_data["dst_idx"]]
trans = Pitch(tuple(edge_data["transposition"]), u.dims)
movements = {int(k): v for k, v in edge_data["movements"].items()}
graph.add_edge(
u,
v,
transposition=trans,
weight=edge_data.get("weight", 1.0),
movements=movements,
cent_diffs=edge_data.get("cent_diffs", []),
voice_crossing=edge_data.get("voice_crossing", False),
is_directly_tunable=edge_data.get("is_directly_tunable", False),
)
return graph
def save_graph_pickle(graph: "nx.MultiDiGraph", path: str) -> None:
"""Save graph to pickle file."""
import pickle
with open(path, "wb") as f:
pickle.dump(graph, f)
def load_graph_pickle(path: str) -> "nx.MultiDiGraph":
"""Load graph from pickle file."""
import pickle
with open(path, "rb") as f:
return pickle.load(f)
def save_graph_json(graph: "nx.MultiDiGraph", path: str) -> None:
"""Save graph to JSON file."""
data = graph_to_dict(graph)
with open(path, "w") as f:
json.dump(data, f, indent=2)
def load_graph_json(path: str) -> "nx.MultiDiGraph":
"""Load graph from JSON file."""
import json
with open(path, "r") as f:
data = json.load(f)
return graph_from_dict(data)
def get_cache_key(
dims: int, chord_size: int, symdiff_min: int, symdiff_max: int
) -> str:
"""Generate cache key from parameters."""
return f"d{dims}_n{size}_s{min}-{max}".replace("{size}", str(chord_size))
def load_graph_from_cache(
cache_dir: str,
dims: int,
chord_size: int,
symdiff_min: int,
symdiff_max: int,
) -> tuple["nx.MultiDiGraph | None", bool]:
"""
Try to load graph from cache.
Returns:
(graph, was_cached): graph if found, False if not found
"""
cache_key = f"d{dims}_n{chord_size}_s{symdiff_min}-{symdiff_max}"
pkl_path = Path(cache_dir) / f"{cache_key}.pkl"
json_path = Path(cache_dir) / f"{cache_key}.json"
# Try pickle first (faster)
if pkl_path.exists():
try:
graph = load_graph_pickle(str(pkl_path))
return graph, True
except Exception as e:
print(f"Warning: Failed to load pickle cache: {e}")
# Try JSON
if json_path.exists():
try:
graph = load_graph_json(str(json_path))
return graph, True
except Exception as e:
print(f"Warning: Failed to load JSON cache: {e}")
return None, False
def save_graph_to_cache(
graph: "nx.MultiDiGraph",
cache_dir: str,
dims: int,
chord_size: int,
symdiff_min: int,
symdiff_max: int,
) -> None:
"""Save graph to cache in both pickle and JSON formats."""
import os
cache_key = f"d{dims}_n{chord_size}_s{symdiff_min}-{symdiff_max}"
pkl_path = Path(cache_dir) / f"{cache_key}.pkl"
json_path = Path(cache_dir) / f"{cache_key}.json"
os.makedirs(cache_dir, exist_ok=True)
# Save both formats
try:
save_graph_pickle(graph, str(pkl_path))
print(f"Cached to {pkl_path}")
except Exception as e:
print(f"Warning: Failed to save pickle: {e}")
try:
save_graph_json(graph, str(json_path))
print(f"Cached to {json_path}")
except Exception as e:
print(f"Warning: Failed to save JSON: {e}")
def main():
"""Demo: Generate compact sets and build graph."""
import argparse
from .dims import DIMS_4, DIMS_5, DIMS_7, DIMS_8
from .harmonic_space import HarmonicSpace
from .pathfinder import PathFinder
parser = argparse.ArgumentParser(
description="Generate chord paths in harmonic space"
)
parser.add_argument(
"--symdiff-min",
type=int,
default=2,
help="Minimum symmetric difference between chords",
)
parser.add_argument(
"--symdiff-max",
type=int,
default=2,
help="Maximum symmetric difference between chords",
)
parser.add_argument(
"--melodic-min",
type=int,
default=0,
help="Minimum cents for any pitch movement (0 = no minimum)",
)
parser.add_argument(
"--melodic-max",
type=int,
default=500,
help="Maximum cents for any pitch movement (0 = no maximum)",
)
parser.add_argument(
"--target-register",
type=float,
default=0,
help="Target register in octaves (default: disabled, 2 = two octaves)",
)
parser.add_argument(
"--allow-voice-crossing",
action="store_true",
help="Allow edges where voices cross (default: reject)",
)
parser.add_argument(
"--disable-direct-tuning",
action="store_true",
help="Disable direct tuning requirement (default: require)",
)
parser.add_argument(
"--weight-melodic",
type=float,
default=1,
help="Weight for melodic threshold factor (0=disabled, default: 1)",
)
parser.add_argument(
"--weight-contrary-motion",
type=float,
default=0,
help="Weight for contrary motion factor (0=disabled, default: 0)",
)
parser.add_argument(
"--weight-dca-hamiltonian",
type=float,
default=1,
help="Weight for DCA Hamiltonian factor - favors long-unvisited nodes (0=disabled, default: 1)",
)
parser.add_argument(
"--weight-dca-voice-movement",
type=float,
default=1,
help="Weight for DCA voice movement factor - favors voices that stay long to change (0=disabled, default: 1)",
)
parser.add_argument(
"--weight-target-register",
type=float,
default=1,
help="Weight for target register factor (0=disabled, default: 1)",
)
parser.add_argument(
"--dims", type=int, default=7, help="Number of prime dimensions (4, 5, 7, or 8)"
)
parser.add_argument("--chord-size", type=int, default=3, help="Size of chords")
parser.add_argument("--max-path", type=int, default=50, help="Maximum path length")
parser.add_argument(
"--seed", type=int, default=None, help="Random seed (default: random)"
)
parser.add_argument(
"--cache-dir",
type=str,
default="./cache",
help="Cache directory for graphs",
)
parser.add_argument(
"--rebuild-cache",
action="store_true",
help="Force rebuild graph (ignore cache)",
)
parser.add_argument(
"--no-cache",
action="store_true",
help="Disable caching",
)
parser.add_argument(
"--output-dir",
type=str,
default="output",
help="Output directory for generated files",
)
parser.add_argument(
"--stats",
action="store_true",
help="Show analysis statistics after generation",
)
parser.add_argument(
"--output-path-steps",
action="store_true",
help="Export path steps with full edge data to path_steps.json",
)
parser.add_argument(
"--osc-play",
nargs="?",
const="output/output_chords.json",
default=None,
help="Enable OSC playback (optionally specify chord file, default: output/output_chords.json)",
)
parser.add_argument(
"--transcribe",
nargs="*",
metavar=("INPUT", "OUTPUT"),
default=None,
help="Generate LilyPond transcription (optionally: input_file output_name)",
)
parser.add_argument(
"--osc-ip",
type=str,
default="192.168.4.200",
help="OSC destination IP (default: 192.168.4.200)",
)
parser.add_argument(
"--osc-port",
type=int,
default=54001,
help="OSC destination port (default: 54001)",
)
parser.add_argument(
"--fundamental",
type=float,
default=100,
help="Fundamental frequency in Hz for OSC output (default: 100)",
)
args = parser.parse_args()
# Handle transcription mode (separate from generation)
if args.transcribe is not None:
import json
from .transcriber import transcribe
input_file = (
args.transcribe[0]
if len(args.transcribe) > 0
else "output/output_chords.json"
)
output_name = (
args.transcribe[1]
if len(args.transcribe) > 1
else "compact_sets_transcription"
)
with open(input_file) as f:
chords = json.load(f)
print(f"Transcribing {len(chords)} chords from {input_file}")
transcribe(
chords,
output_name,
fundamental=args.fundamental,
)
return # Exit after transcribing
# Handle OSC playback mode (separate from generation)
if args.osc_play:
from .osc_sender import OSCSender
chords_file = args.osc_play
sender = OSCSender(
ip=args.osc_ip, port=args.osc_port, fundamental=args.fundamental
)
sender.load_chords(chords_file)
print(f"OSC playback from: {chords_file}")
print(f"Destination: {args.osc_ip}:{args.osc_port}")
print(f"Fundamental: {args.fundamental} Hz")
sender.play()
return # Exit after OSC playback
# Select dims
if args.dims == 4:
dims = DIMS_4
elif args.dims == 5:
dims = DIMS_5
elif args.dims == 7:
dims = DIMS_7
elif args.dims == 8:
dims = DIMS_8
else:
dims = DIMS_7
space = HarmonicSpace(dims, collapsed=True)
print(f"Space: {space}")
print(f"Symdiff: {args.symdiff_min} to {args.symdiff_max}")
# Try to load from cache
graph = None
was_cached = False
if not args.no_cache and not args.rebuild_cache:
graph, was_cached = load_graph_from_cache(
args.cache_dir,
args.dims,
args.chord_size,
args.symdiff_min,
args.symdiff_max,
)
if was_cached:
print(f"Loaded graph from cache")
print(
f"Graph: {graph.number_of_nodes()} nodes, {graph.number_of_edges()} edges"
)
# Build graph if not loaded from cache
if graph is None:
print("Generating connected sets...")
chords = space.generate_connected_sets(
min_size=args.chord_size, max_size=args.chord_size
)
print(f"Found {len(chords)} unique chords")
print("Building voice leading graph...")
graph = space.build_voice_leading_graph(
chords,
symdiff_min=args.symdiff_min,
symdiff_max=args.symdiff_max,
)
print(
f"Graph: {graph.number_of_nodes()} nodes, {graph.number_of_edges()} edges"
)
# Save to cache
if not args.no_cache:
save_graph_to_cache(
graph,
args.cache_dir,
args.dims,
args.chord_size,
args.symdiff_min,
args.symdiff_max,
)
# Find stochastic path
print("Finding stochastic path...")
path_finder = PathFinder(graph)
if args.seed is not None:
seed(args.seed)
weights_config = path_finder._default_weights_config()
weights_config["melodic_threshold_min"] = args.melodic_min
weights_config["melodic_threshold_max"] = args.melodic_max
weights_config["voice_crossing_allowed"] = args.allow_voice_crossing
weights_config["direct_tuning"] = not args.disable_direct_tuning
# Soft factor weights
weights_config["weight_melodic"] = args.weight_melodic
weights_config["weight_contrary_motion"] = args.weight_contrary_motion
weights_config["weight_dca_hamiltonian"] = args.weight_dca_hamiltonian
weights_config["weight_dca_voice_movement"] = args.weight_dca_voice_movement
# Target register
if args.target_register > 0:
weights_config["target_register"] = True
weights_config["target_register_octaves"] = args.target_register
weights_config["weight_target_register"] = args.weight_target_register
else:
weights_config["weight_target_register"] = 0 # disabled
weights_config["max_path"] = args.max_path
path_obj = path_finder.find_stochastic_path(
max_length=args.max_path, weights_config=weights_config
)
print(f"Path length: {len(path_obj)}")
# Create output directory and write files
import os
os.makedirs(args.output_dir, exist_ok=True)
# Save graph_path for Hamiltonian analysis
import json
graph_path_data = [hash(node) for node in path_obj.graph_chords]
graph_path_file = os.path.join(args.output_dir, "graph_path.json")
with open(graph_path_file, "w") as f:
json.dump(graph_path_data, f)
print(f"Written to {graph_path_file}")
write_chord_sequence(
path_obj.output_chords, os.path.join(args.output_dir, "output_chords.json")
)
print(f"Written to {args.output_dir}/output_chords.json")
write_chord_sequence_readable(
path_obj.output_chords, os.path.join(args.output_dir, "output_chords.txt")
)
print(f"Written to {args.output_dir}/output_chords.txt")
write_chord_sequence_frequencies(
path_obj.output_chords, os.path.join(args.output_dir, "output_frequencies.txt")
)
print(f"Written to {args.output_dir}/output_frequencies.txt")
if args.output_path_steps:
write_path_steps(path_obj, os.path.join(args.output_dir, "path_steps.json"))
print(f"Written to {args.output_dir}/path_steps.json")
# Show stats if requested
if args.stats:
from .analyze import analyze_chords, format_analysis
config = {
"melodic_threshold_max": args.melodic_max,
"target_register_octaves": args.target_register,
"max_path": args.max_path,
"graph_nodes": graph.number_of_nodes() if graph else None,
}
# Load the chords from the output file
import json
chords_file = os.path.join(args.output_dir, "output_chords.json")
with open(chords_file) as f:
chords = json.load(f)
# Load graph_path for Hamiltonian analysis
graph_path_file = os.path.join(args.output_dir, "graph_path.json")
graph_path = None
if os.path.exists(graph_path_file):
with open(graph_path_file) as f:
graph_path = json.load(f)
metrics = analyze_chords(chords, config, graph_path)
print()
print(format_analysis(metrics))
# OSC playback if enabled
if args.osc_play:
from .osc_sender import OSCSender
chords_file = os.path.join(args.output_dir, "output_chords.json")
sender = OSCSender(
ip=args.osc_ip, port=args.osc_port, fundamental=args.fundamental
)
sender.load_chords(chords_file)
print(f"\nOSC enabled - sending to {args.osc_ip}:{args.osc_port}")
print(f"Fundamental: {args.fundamental} Hz")
sender.play()
if __name__ == "__main__":
main()