#!/usr/bin/env python """ OSC Sender - send chord frequencies via OSC for real-time playback. """ import json import sys from pathlib import Path try: from pythonosc import osc_message_builder from pythonosc.udp_client import SimpleUDPClient except ImportError: print("Error: python-osc not installed. Run: pip install python-osc") sys.exit(1) class OSCSender: """Send chord frequencies via OSC.""" def __init__(self, ip="192.168.4.200", port=54001, fundamental=100): self.ip = ip self.port = port self.fundamental = fundamental self.client = SimpleUDPClient(ip, port) self.chords = [] self.current_index = 0 self.preview_mode = True self.num_voices = 0 self.voice_indices = [] # Track position for each voice independently def load_chords(self, chords_file): """Load chords from output_chords.json.""" with open(chords_file) as f: chords_data = json.load(f) # Handle both old format (list) and new format (dict with dims + chords) if isinstance(chords_data, dict) and "chords" in chords_data: self.chords = chords_data["chords"] else: self.chords = chords_data # Initialize voice indices to 0 for all voices if self.chords: self.num_voices = len(self.chords[0]) self.voice_indices = [0] * self.num_voices else: self.num_voices = 0 self.voice_indices = [] print(f"Loaded {len(self.chords)} chords, {self.num_voices} voices") def send_chord(self, index): """Send frequencies for a specific chord index.""" if index < 0 or index >= len(self.chords): return chord = self.chords[index] for voice_idx, pitch in enumerate(chord): from fractions import Fraction frac = Fraction(pitch["fraction"]) freq = self.fundamental * float(frac) addr = "/freq" msg = osc_message_builder.OscMessageBuilder(address=addr) msg.add_arg(voice_idx + 1) # Voice number (1-indexed) msg.add_arg(freq) # Frequency self.client.send(msg.build()) print(f" [Sent chord {index + 1}/{len(self.chords)}]") def send_voice(self, voice_idx): """Send frequency for a single voice at its current position.""" if voice_idx < 0 or voice_idx >= len(self.voice_indices): return chord_idx = self.voice_indices[voice_idx] if chord_idx < 0 or chord_idx >= len(self.chords): return chord = self.chords[chord_idx] pitch = chord[voice_idx] from fractions import Fraction frac = Fraction(pitch["fraction"]) freq = self.fundamental * float(frac) msg = osc_message_builder.OscMessageBuilder(address="/freq") msg.add_arg(voice_idx + 1) # Voice number (1-indexed) msg.add_arg(freq) self.client.send(msg.build()) def send_current(self): """Send all voices at their current positions.""" for voice_idx in range(self.num_voices): self.send_voice(voice_idx) print(f" [Sent all {self.num_voices} voices at their positions]") def _get_chord_preview(self, index): """Get a one-line preview of a chord.""" if index < 0 or index >= len(self.chords): return None chord = self.chords[index] from fractions import Fraction freqs = [ f"{self.fundamental * float(Fraction(p['fraction'])):.2f}" for p in chord ] return f"Chord {index + 1}: {', '.join(freqs)} Hz" def _get_chord_full(self, index): """Get full details of a chord.""" if index < 0 or index >= len(self.chords): return [] return self.chords[index] def display_chords(self): """Display previous, current, and next chords with per-voice positions.""" total = len(self.chords) from fractions import Fraction # Build prev/next info for each voice independently prev_freqs = [] prev_positions = [] next_freqs = [] next_positions = [] current_freqs = [] for voice_idx in range(self.num_voices): curr_idx = self.voice_indices[voice_idx] # Previous for this voice prev_idx = max(0, curr_idx - 1) chord = self.chords[prev_idx] pitch = chord[voice_idx] frac = Fraction(pitch["fraction"]) freq = self.fundamental * float(frac) prev_freqs.append(f"{freq:.2f}") prev_positions.append(prev_idx + 1) # Current for this voice chord = self.chords[curr_idx] pitch = chord[voice_idx] frac = Fraction(pitch["fraction"]) freq = self.fundamental * float(frac) current_freqs.append((freq, pitch["hs_array"])) # Next for this voice next_idx = min(len(self.chords) - 1, curr_idx + 1) chord = self.chords[next_idx] pitch = chord[voice_idx] frac = Fraction(pitch["fraction"]) freq = self.fundamental * float(frac) next_freqs.append(f"{freq:.2f}") next_positions.append(next_idx + 1) # Get current positions for display curr_positions = [v + 1 for v in self.voice_indices] print("\n" + "=" * 60) # PREVIOUS line print(f"PREVIOUS ({prev_positions}/{total})") print(f"{', '.join(prev_freqs)} Hz") print("-" * 60) # CURRENT line with positions print(f"CURRENT ({curr_positions}/{total})") for voice_idx in range(self.num_voices): freq, hs = current_freqs[voice_idx] print(f" Voice {voice_idx + 1}: {freq:.2f} Hz {hs}") print("-" * 60) # NEXT line print(f"Next ({next_positions}/{total})") print(f"{', '.join(next_freqs)} Hz") print("-" * 60) mode_str = "[PREVIEW]" if self.preview_mode else "[SEND]" print( f"Fundamental: {self.fundamental} Hz | Destination: {self.ip}:{self.port} {mode_str}" ) print( "Controls: <-|-> all | [n]<-/->=voice n | p=preview | k/K=kill | Enter=send all | Esc=quit" ) def play(self): """Interactive playback with keyboard control.""" if not self.chords: print("No chords loaded!") return try: import tty import termios self.preview_mode = True self.current_index = 0 self.display_chords() def get_key(): """Get a keypress. Returns full escape sequences for arrow keys.""" fd = sys.stdin.fileno() old_settings = termios.tcgetattr(fd) try: tty.setraw(sys.stdin.fileno()) ch = sys.stdin.read(1) if ch == "\x1b": next1 = sys.stdin.read(1) if next1 == "[": next2 = sys.stdin.read(1) return "\x1b[" + next2 else: return ch finally: termios.tcsetattr(fd, termios.TCSADRAIN, old_settings) return ch num_buffer = "" while True: key = get_key() if key == "\x1b": break elif key == "\x1b[D": # Back arrow if num_buffer: # Individual voice: [n]<- move voice n back try: voice_num = int(num_buffer) - 1 if 0 <= voice_num < self.num_voices: if self.voice_indices[voice_num] > 0: self.voice_indices[voice_num] -= 1 self.display_chords() if not self.preview_mode: self.send_voice(voice_num) else: print(f"\n Voice {voice_num + 1} at start") else: print(f"\n Invalid voice: {num_buffer}") except ValueError: pass num_buffer = "" else: # All voices: move all voice_indices back can_move = any(vi > 0 for vi in self.voice_indices) if can_move: for voice_idx in range(self.num_voices): if self.voice_indices[voice_idx] > 0: self.voice_indices[voice_idx] -= 1 self.display_chords() if not self.preview_mode: self.send_current() elif key == "\x1b[C": # Forward arrow if num_buffer: # Individual voice: [n]-> move voice n forward try: voice_num = int(num_buffer) - 1 if 0 <= voice_num < self.num_voices: if self.voice_indices[voice_num] < len(self.chords) - 1: self.voice_indices[voice_num] += 1 self.display_chords() if not self.preview_mode: self.send_voice(voice_num) else: print(f"\n Voice {voice_num + 1} at end") else: print(f"\n Invalid voice: {num_buffer}") except ValueError: pass num_buffer = "" else: # All voices: move all voice_indices forward can_move = any( vi < len(self.chords) - 1 for vi in self.voice_indices ) if can_move: for voice_idx in range(self.num_voices): if self.voice_indices[voice_idx] < len(self.chords) - 1: self.voice_indices[voice_idx] += 1 self.display_chords() if not self.preview_mode: self.send_current() elif key == "p": self.preview_mode = not self.preview_mode self.display_chords() elif key == "k": for voice_idx in range(self.num_voices): msg = osc_message_builder.OscMessageBuilder(address="/freq") msg.add_arg(voice_idx + 1) msg.add_arg(15.0) self.client.send(msg.build()) print(f" [Sent kill soft (15.0) to {self.num_voices} voices]") elif key == "K": for voice_idx in range(self.num_voices): msg = osc_message_builder.OscMessageBuilder(address="/freq") msg.add_arg(voice_idx + 1) msg.add_arg(0.0) self.client.send(msg.build()) print(f" [Sent kill hard (0.0) to {self.num_voices} voices]") elif key.isdigit(): num_buffer += key print(f"\rVoice: {num_buffer}", end="", flush=True) elif key == "\r": if num_buffer: try: idx = int(num_buffer) - 1 if 0 <= idx < len(self.chords): # Set all voices to this chord index self.voice_indices = [idx] * self.num_voices self.display_chords() if not self.preview_mode: self.send_current() else: print(f"\nIndex must be 1-{len(self.chords)}") except ValueError: print(f"\nInvalid number: {num_buffer}") num_buffer = "" elif not self.preview_mode: self.send_current() elif key == "\x7f" or key == "\x08": if num_buffer: num_buffer = num_buffer[:-1] if num_buffer: print(f"\rVoice: {num_buffer}", end="", flush=True) else: print("\r ", end="", flush=True) print("\r", end="", flush=True) except KeyboardInterrupt: pass print("\n\nPlayback stopped.") def send_all(self): """Send all chords in sequence (for testing).""" for i in range(len(self.chords)): self.send_chord(i) print("All chords sent!") def load_chords_from_output(output_dir="output"): """Load chords from the standard output directory.""" chords_file = Path(output_dir) / "output_chords.json" if not chords_file.exists(): raise FileNotFoundError(f"Chords file not found: {chords_file}") with open(chords_file) as f: return json.load(f) if __name__ == "__main__": import argparse parser = argparse.ArgumentParser(description="OSC Sender for Compact Sets") parser.add_argument("--ip", default="192.168.4.200", help="Destination IP") parser.add_argument("--port", type=int, default=54001, help="Destination port") parser.add_argument( "--fundamental", type=float, default=100, help="Fundamental frequency in Hz" ) parser.add_argument("--output-dir", default="output", help="Output directory") parser.add_argument( "--send-all", action="store_true", help="Send all chords in sequence" ) args = parser.parse_args() # Load chords chords_file = Path(args.output_dir) / "output_chords.json" if not chords_file.exists(): print(f"Error: {chords_file} not found!") print("Run compact_sets.py first to generate chords.") sys.exit(1) # Create sender sender = OSCSender(ip=args.ip, port=args.port, fundamental=args.fundamental) sender.load_chords(chords_file) if args.send_all: sender.send_all() else: sender.play()