compact_sets/src/osc_sender.py

487 lines
18 KiB
Python
Raw Normal View History

#!/usr/bin/env python
"""
OSC Sender - send chord frequencies via OSC for real-time playback.
"""
import json
import sys
import threading
from pathlib import Path
try:
from pythonosc import osc_message_builder
from pythonosc.udp_client import SimpleUDPClient
except ImportError:
print("Error: python-osc not installed. Run: pip install python-osc")
sys.exit(1)
class OSCSender:
"""Send chord frequencies via OSC."""
def __init__(self, ip="192.168.4.200", port=54001, fundamental=100):
self.ip = ip
self.port = port
self.fundamental = fundamental
self.client = SimpleUDPClient(ip, port)
self.chords = []
self.current_index = 0
self.preview_mode = True
self.num_voices = 4 # Default to 4 voices
self.voice_indices = [] # Track position for each voice independently
self._ramp_events = {} # voice_idx -> threading.Event
self._ramp_threads = {} # voice_idx -> thread
self._current_frequencies = {} # voice_idx -> current frequency
def load_chords(self, chords_file):
"""Load chords from output_chords.json."""
with open(chords_file) as f:
chords_data = json.load(f)
# Handle both old format (list) and new format (dict with dims + chords)
if isinstance(chords_data, dict) and "chords" in chords_data:
self.chords = chords_data["chords"]
else:
self.chords = chords_data
# Initialize voice indices to 0 for all voices
if self.chords:
self.num_voices = len(self.chords[0])
self.voice_indices = [0] * self.num_voices
else:
self.num_voices = 0
self.voice_indices = []
print(f"Loaded {len(self.chords)} chords, {self.num_voices} voices")
def set_chords(self, chords):
"""Set chords directly (list of chord pitch arrays)."""
self.chords = chords
if self.chords:
self.num_voices = len(self.chords[0])
self.voice_indices = [0] * self.num_voices
def send_chord(self, index):
"""Send frequencies for a specific chord index."""
if index < 0 or index >= len(self.chords):
return
chord = self.chords[index]
for voice_idx, pitch in enumerate(chord):
from fractions import Fraction
frac = Fraction(pitch["fraction"])
freq = self.fundamental * float(frac)
addr = "/freq"
msg = osc_message_builder.OscMessageBuilder(address=addr)
msg.add_arg(voice_idx + 1) # Voice number (1-indexed)
msg.add_arg(freq) # Frequency
self.client.send(msg.build())
print(f" [Sent chord {index + 1}/{len(self.chords)}]")
def send_voice(self, voice_idx):
"""Send frequency for a single voice at its current position."""
if voice_idx < 0 or voice_idx >= len(self.voice_indices):
return
chord_idx = self.voice_indices[voice_idx]
if chord_idx < 0 or chord_idx >= len(self.chords):
return
chord = self.chords[chord_idx]
pitch = chord[voice_idx]
from fractions import Fraction
frac = Fraction(pitch["fraction"])
freq = self.fundamental * float(frac)
msg = osc_message_builder.OscMessageBuilder(address="/freq")
msg.add_arg(voice_idx + 1) # Voice number (1-indexed)
msg.add_arg(freq)
self.client.send(msg.build())
def send_single(self, frequency, voice):
"""Send a single frequency to a specific voice (1-indexed)."""
msg = osc_message_builder.OscMessageBuilder(address="/freq")
msg.add_arg(voice) # Voice number (1-indexed)
msg.add_arg(frequency)
self.client.send(msg.build())
# Track current frequency for this voice
self._current_frequencies[voice - 1] = frequency
def get_current_frequency(self, voice_idx):
"""Get current frequency for a voice (0-indexed)."""
return self._current_frequencies.get(voice_idx, 20)
def send_kill(self, soft=True):
"""Send kill to all voices (soft=20Hz, hard=0Hz)."""
kill_freq = 20.0 if soft else 0.0
for voice in range(1, self.num_voices + 1):
self.send_single(kill_freq, voice)
print(
f" [Sent kill {'soft' if soft else 'hard'} ({kill_freq}) to {self.num_voices} voices]"
)
def ramp_to_chord(self, frequencies, duration_ms=3000, start_freq=20):
"""Ramp each voice from start_freq to target frequency(s) linearly.
Args:
frequencies: single frequency (int/float) or list of frequencies
duration_ms: duration in milliseconds
start_freq: starting frequency (single or per-voice list)
"""
import time
# Handle single frequency vs list
if isinstance(frequencies, (int, float)):
frequencies = [frequencies]
# Handle single start_freq vs list (per-voice)
if isinstance(start_freq, (int, float)):
start_freq = [start_freq] * len(frequencies)
elif start_freq is None:
# Default: use current frequency from sender
start_freq = [
self.get_current_frequency(i) for i in range(len(frequencies))
]
# Cancel any existing ramp for these voices before starting new ones
for voice_idx in range(len(frequencies)):
if voice_idx in self._ramp_events:
self._ramp_events[voice_idx].set()
if voice_idx in self._ramp_threads:
old_thread = self._ramp_threads[voice_idx]
if old_thread.is_alive():
print(f" [Cancelling existing ramp for voice {voice_idx + 1}]")
# Create new cancel events for each voice
for voice_idx in range(len(frequencies)):
self._ramp_events[voice_idx] = threading.Event()
# Fixed interval time for smooth ramp regardless of duration
step_interval = 0.02 # 20ms = 50 Hz update rate
num_steps = max(1, int(duration_ms / 1000 / step_interval))
def run_ramp(voice_idx, target_freq, voice_start):
cancel_event = self._ramp_events[voice_idx]
cancel_event.clear() # Reset cancellation flag
for step in range(num_steps + 1):
if cancel_event.is_set():
print(f" [Ramp cancelled for voice {voice_idx + 1}]")
break
progress = step / num_steps
current_freq = voice_start + (target_freq - voice_start) * progress
self.send_single(current_freq, voice_idx + 1)
time.sleep(step_interval)
else:
# Send final exact target frequency (only if not cancelled)
self.send_single(target_freq, voice_idx + 1)
# Start a thread for each voice
for voice_idx, target_freq in enumerate(frequencies):
voice_start = start_freq[voice_idx]
thread = threading.Thread(
target=run_ramp, args=(voice_idx, target_freq, voice_start)
)
thread.daemon = True
self._ramp_threads[voice_idx] = thread
thread.start()
print(f" [Started ramp to {len(frequencies)} voices over {duration_ms}ms]")
def send_current(self):
"""Send all voices at their current positions."""
for voice_idx in range(self.num_voices):
self.send_voice(voice_idx)
print(f" [Sent all {self.num_voices} voices at their positions]")
def _get_chord_preview(self, index):
"""Get a one-line preview of a chord."""
if index < 0 or index >= len(self.chords):
return None
chord = self.chords[index]
from fractions import Fraction
freqs = [
f"{self.fundamental * float(Fraction(p['fraction'])):.2f}" for p in chord
]
return f"Chord {index + 1}: {', '.join(freqs)} Hz"
def _get_chord_full(self, index):
"""Get full details of a chord."""
if index < 0 or index >= len(self.chords):
return []
return self.chords[index]
def display_chords(self):
"""Display previous, current, and next chords with per-voice positions."""
total = len(self.chords)
from fractions import Fraction
# Build prev/next info for each voice independently
prev_freqs = []
prev_positions = []
next_freqs = []
next_positions = []
current_freqs = []
for voice_idx in range(self.num_voices):
curr_idx = self.voice_indices[voice_idx]
# Previous for this voice
prev_idx = max(0, curr_idx - 1)
chord = self.chords[prev_idx]
pitch = chord[voice_idx]
frac = Fraction(pitch["fraction"])
freq = self.fundamental * float(frac)
prev_freqs.append(f"{freq:.2f}")
prev_positions.append(prev_idx + 1)
# Current for this voice
chord = self.chords[curr_idx]
pitch = chord[voice_idx]
frac = Fraction(pitch["fraction"])
freq = self.fundamental * float(frac)
current_freqs.append((freq, pitch["hs_array"]))
# Next for this voice
next_idx = min(len(self.chords) - 1, curr_idx + 1)
chord = self.chords[next_idx]
pitch = chord[voice_idx]
frac = Fraction(pitch["fraction"])
freq = self.fundamental * float(frac)
next_freqs.append(f"{freq:.2f}")
next_positions.append(next_idx + 1)
# Get current positions for display
curr_positions = [v + 1 for v in self.voice_indices]
print("\n" + "=" * 60)
# PREVIOUS line
print(f"PREVIOUS ({prev_positions}/{total})")
print(f"{', '.join(prev_freqs)} Hz")
print("-" * 60)
# CURRENT line with positions
print(f"CURRENT ({curr_positions}/{total})")
for voice_idx in range(self.num_voices):
freq, hs = current_freqs[voice_idx]
print(f" Voice {voice_idx + 1}: {freq:.2f} Hz {hs}")
print("-" * 60)
# NEXT line
print(f"Next ({next_positions}/{total})")
print(f"{', '.join(next_freqs)} Hz")
print("-" * 60)
mode_str = "[PREVIEW]" if self.preview_mode else "[SEND]"
print(
f"Fundamental: {self.fundamental} Hz | Destination: {self.ip}:{self.port} {mode_str}"
)
print(
"Controls: <-|-> all | [n]<-/->=voice n | p=preview | k/K=kill | Enter=send all | Esc=quit"
)
def play(self):
"""Interactive playback with keyboard control."""
if not self.chords:
print("No chords loaded!")
return
2026-03-17 17:19:52 +01:00
try:
import tty
import termios
self.preview_mode = True
self.current_index = 0
self.display_chords()
def get_key():
"""Get a keypress. Returns full escape sequences for arrow keys."""
fd = sys.stdin.fileno()
old_settings = termios.tcgetattr(fd)
try:
tty.setraw(sys.stdin.fileno())
ch = sys.stdin.read(1)
if ch == "\x1b":
next1 = sys.stdin.read(1)
if next1 == "[":
next2 = sys.stdin.read(1)
return "\x1b[" + next2
else:
return ch
finally:
termios.tcsetattr(fd, termios.TCSADRAIN, old_settings)
return ch
num_buffer = ""
while True:
key = get_key()
if key == "\x1b":
break
elif key == "\x1b[D":
# Back arrow
if num_buffer:
# Individual voice: [n]<- move voice n back
try:
voice_num = int(num_buffer) - 1
if 0 <= voice_num < self.num_voices:
if self.voice_indices[voice_num] > 0:
self.voice_indices[voice_num] -= 1
self.display_chords()
if not self.preview_mode:
self.send_voice(voice_num)
else:
print(f"\n Voice {voice_num + 1} at start")
else:
print(f"\n Invalid voice: {num_buffer}")
except ValueError:
pass
num_buffer = ""
else:
# All voices: move all voice_indices back
can_move = any(vi > 0 for vi in self.voice_indices)
if can_move:
for voice_idx in range(self.num_voices):
if self.voice_indices[voice_idx] > 0:
self.voice_indices[voice_idx] -= 1
self.display_chords()
if not self.preview_mode:
self.send_current()
elif key == "\x1b[C":
# Forward arrow
if num_buffer:
# Individual voice: [n]-> move voice n forward
try:
voice_num = int(num_buffer) - 1
if 0 <= voice_num < self.num_voices:
if self.voice_indices[voice_num] < len(self.chords) - 1:
self.voice_indices[voice_num] += 1
self.display_chords()
if not self.preview_mode:
self.send_voice(voice_num)
else:
print(f"\n Voice {voice_num + 1} at end")
else:
print(f"\n Invalid voice: {num_buffer}")
except ValueError:
pass
num_buffer = ""
else:
# All voices: move all voice_indices forward
can_move = any(
vi < len(self.chords) - 1 for vi in self.voice_indices
)
if can_move:
for voice_idx in range(self.num_voices):
if self.voice_indices[voice_idx] < len(self.chords) - 1:
self.voice_indices[voice_idx] += 1
self.display_chords()
if not self.preview_mode:
self.send_current()
elif key == "p":
self.preview_mode = not self.preview_mode
self.display_chords()
elif key == "k":
self.send_kill(soft=True)
elif key == "K":
self.send_kill(soft=False)
elif key.isdigit():
num_buffer += key
print(f"\rVoice: {num_buffer}", end="", flush=True)
elif key == "\r":
if num_buffer:
try:
idx = int(num_buffer) - 1
if 0 <= idx < len(self.chords):
# Set all voices to this chord index
self.voice_indices = [idx] * self.num_voices
self.display_chords()
if not self.preview_mode:
self.send_current()
else:
print(f"\nIndex must be 1-{len(self.chords)}")
except ValueError:
print(f"\nInvalid number: {num_buffer}")
num_buffer = ""
elif not self.preview_mode:
self.send_current()
elif key == "\x7f" or key == "\x08":
if num_buffer:
num_buffer = num_buffer[:-1]
if num_buffer:
print(f"\rVoice: {num_buffer}", end="", flush=True)
else:
print("\r ", end="", flush=True)
print("\r", end="", flush=True)
except KeyboardInterrupt:
pass
print("\n\nPlayback stopped.")
def send_all(self):
"""Send all chords in sequence (for testing)."""
for i in range(len(self.chords)):
self.send_chord(i)
print("All chords sent!")
def load_chords_from_output(output_dir="output"):
"""Load chords from the standard output directory."""
chords_file = Path(output_dir) / "output_chords.json"
if not chords_file.exists():
raise FileNotFoundError(f"Chords file not found: {chords_file}")
with open(chords_file) as f:
return json.load(f)
if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser(description="OSC Sender for Compact Sets")
parser.add_argument("--ip", default="192.168.4.200", help="Destination IP")
parser.add_argument("--port", type=int, default=54001, help="Destination port")
parser.add_argument(
"--fundamental", type=float, default=100, help="Fundamental frequency in Hz"
)
parser.add_argument("--output-dir", default="output", help="Output directory")
parser.add_argument(
"--send-all", action="store_true", help="Send all chords in sequence"
)
args = parser.parse_args()
# Load chords
chords_file = Path(args.output_dir) / "output_chords.json"
if not chords_file.exists():
print(f"Error: {chords_file} not found!")
print("Run compact_sets.py first to generate chords.")
sys.exit(1)
# Create sender
sender = OSCSender(ip=args.ip, port=args.port, fundamental=args.fundamental)
sender.load_chords(chords_file)
if args.send_all:
sender.send_all()
else:
sender.play()