diff --git a/core/actions.py b/core/actions.py new file mode 100644 index 0000000..270f9ea --- /dev/null +++ b/core/actions.py @@ -0,0 +1,43 @@ +class TimedAction: + + def __init__(self, start_t, duration_t, nodes, program): + + self.program = program + self.program.actions.append(self) + + if not isinstance(nodes, list): + nodes = [nodes] + + self.nodes = nodes # list of nodes + self.start_t = start_t + self.duration_t = duration_t + self.end_t = start_t + duration_t + + def tick(self, t): + + if t >= self.start_t and t <= self.end_t: + + for node in self.nodes: + + if t == self.start_t: + self.start(node) + + elif t == self.end_t: + self.end(node) + + else: + self.progress(t - self.start_t, node) + + # REIMPLEMENT + + def start(self, node): + + pass + + def end(self, node): + + pass + + def progress(self, rel_t, node): + + pass \ No newline at end of file diff --git a/core/nodes/sinenode.py b/core/nodes/sinenode.py index 5e98cf5..598d021 100644 --- a/core/nodes/sinenode.py +++ b/core/nodes/sinenode.py @@ -1,6 +1,7 @@ import math from ..soundnode import SoundNode +from ..actions import TimedAction class SineNode(SoundNode): @@ -8,21 +9,52 @@ class SineNode(SoundNode): super().__init__("sine", room) self.freqs = freqs + self.active = False + self.volume = 1 - def calc_freqs_volumes(self, t): - # This function returns volumes of each relevant freq - # at tick t - - res = dict() - for freq in self.freqs: - res[freq] = 1 / float(len(self.freqs)) - return res - - def fill_amp_cache(self, t): + def calc_r_amps(self, t): + if not self.active: + self.r_amps[t] = dict() + return + tdct = dict() - for freq, vol in self.calc_freqs_volumes(t).items(): - tdct[freq] = vol * math.sin(self.room.sine_multiplier * freq * t) - self.amp_cache[t] = tdct - \ No newline at end of file + for freq in self.freqs: + tdct[freq] = self.volume * math.sin(self.room.sine_multiplier * freq * t) + + self.r_amps[t] = tdct + +### + +class PlayAction(TimedAction): + + def __init__(self, start_t, duration_t, nodes, program): + + super().__init__(start_t, duration_t, nodes, program) + + def start(self, node): + + node.active = True + + def end(self, node): + + node.active = False + + +class NoteAction(TimedAction): + + def __init__(self, note, start_t, duration_t, nodes, program): + + super().__init__(start_t, duration_t, nodes, program) + self.note = note + + def start(self, node): + + note_freq = self.program.note_to_freq(self.note) + node.freqs = [note_freq] + node.active = True + + def end(self, node): + + node.active = False \ No newline at end of file diff --git a/core/nodes/sink.py b/core/nodes/sink.py new file mode 100644 index 0000000..16208a5 --- /dev/null +++ b/core/nodes/sink.py @@ -0,0 +1,29 @@ +from ..soundnode import * + +class SinkNode(SoundNode): + + def __init__(self, name, room): + + super().__init__(name, room) + + def calc_r_amps(self, t): + + # This function returns volumes of each relevant freq + # at tick t + + res = dict() + + for source_node in self.wire_in: + for freq, vol in self.sample_r_amps_by_wire(source_node, t).items(): + if not freq in res: + res[freq] = vol + else: + res[freq] += vol + + for source_node in self.air_in: + for freq, vol in self.sample_r_amps_by_air(source_node, t).items(): + if not freq in res: + res[freq] = vol + else: + res[freq] += vol + self.r_amps[t] = res \ No newline at end of file diff --git a/core/program.py b/core/program.py index 715c293..d65010c 100644 --- a/core/program.py +++ b/core/program.py @@ -1,15 +1,133 @@ +import os +import wave + from .room import Room + class Program: - def __init__(self): + NOTES_OF_OCTAVE = { + 0: ['C', 'B#'], + 1: ['C#', 'Db'], + 2: ['D'], + 3: ['D#', 'Eb'], + 4: ['E', 'Fb'], + 5: ['F', 'E#'], + 6: ['F#', 'Gb'], + 7: ['G'], + 8: ['G#', 'Ab'], + 9: ['A'], + 10: ['A#', 'Bb'], + 11: ['B', 'Cb'] + } + + def __init__(self, name): + + self.name = name + self.room = Room() + self.volume = 0.1 + self.actions = [] + + def reset(self): self.room = Room() + def note_to_freq(self, note): + + note_no = self.note_to_note_no(note) + return self.note_no_to_freq(note_no) + + def note_to_note_no(self, note): + + octave = int(note[-1]) + note = note[:-1] + + note_no_oct = 1 + for note_no_oct, notelst in self.NOTES_OF_OCTAVE.items(): + + if note in notelst: + break + + corr_note_no_oct = note_no_oct - 8 + return corr_note_no_oct + (octave*12) + + def note_no_to_freq(self, note_no): + + return 2**((note_no-49) / 12.0) * 440 + + def st(self, t_sec): + # seconds to ticks + return self.room.sample_rate * t_sec + + def generate_frames(self, start_t_sec, end_t_sec): + + frames = [] + + start_t = int(start_t_sec * self.room.sample_rate) + end_t = int(end_t_sec * self.room.sample_rate) + + for t in range(start_t, end_t): + + self.tick(t) + left_frame, right_frame = self.room.generate_frame(t) + + frames.append(left_frame) + frames.append(right_frame) + + return frames + + def write_frames_to_wavefile(self, frames, fn = 'export.wav'): + + with wave.open(fn, 'wb') as wav: + + wav.setnchannels(2) + + wav.setsampwidth(self.room.sample_width) + wav.setframerate(self.room.sample_rate) + wav.writeframes(b''.join(frames)) + + def playback(self, start_t_sec, end_t_sec, fn = 'export.wav'): + + cmd = f'play -q --volume {self.volume} {fn} trim {start_t_sec} {end_t_sec-start_t_sec}' + os.system(cmd) + + def export(self, start_t_sec, end_t_sec, fn = "export.wav"): + + frames = self.generate_frames(start_t_sec, end_t_sec) + self.write_frames_to_wavefile(frames, fn=fn) + + def interface(self): + + print(f'\nRunning program: {self.name}') + print(f'[g 10] to generate 10 seconds') + print(f'[vol 10] to set playback volume to 10%') + print(f'[p 1 5] to play seconds 1 to 5') + + cmd = '' + while cmd != 'q': + cmd = input('\n[vol {self.volume*100}%]>> ') + + if cmd.startswith('g '): + end_sec = float(cmd[2:]) + self.export(0, end_sec) + + if cmd.startswith('vol '): + vol = float(cmd[4:]) + self.volume = vol / 100.0 + + if cmd.startswith('p'): + pp = cmd.split(' ') + start_sec = float(pp[1]) + end_sec = float(pp[2]) + self.playback(start_sec, end_sec) + + def tick(self, t): + + for action in self.actions: + action.tick(t) + + # REIMPLEMENT THESE + def setup(self): - pass - - def tick(t): - pass \ No newline at end of file diff --git a/core/room.py b/core/room.py index 311b11a..50049dd 100644 --- a/core/room.py +++ b/core/room.py @@ -2,39 +2,13 @@ import wave import math from .soundnode import * +from .nodes.sink import * TAU = 2 * math.pi def stb(i: int, bytelen: int) -> bytes: return i.to_bytes(bytelen, byteorder='little', signed=True) -class SinkNode(SoundNode): - - def __init__(self, name, room): - - super().__init__(name, room) - - def fill_amp_cache(self, t): - # This function returns volumes of each relevant freq - # at tick t - - res = dict() - - for source_node in self.wire_in: - for freq, vol in self.sample_freqs_by_wire(source_node, t).items(): - if not freq in res: - res[freq] = vol - else: - res[freq] += vol - - for source_node in self.air_in: - for freq, vol in self.sample_freqs_by_air(source_node, t).items(): - if not freq in res: - res[freq] = vol - else: - res[freq] += vol - - self.amp_cache[t] = res class Room: @@ -42,13 +16,15 @@ class Room: self.nodes = [] - self.dissipation_quotient = 0.9993 self.sample_rate = 44100 self.speed_of_sound = 343 / float(self.sample_rate) # m/tick self.set_bit_depth(24) - self.left_sink = SinkNode('LEFT', self) - self.right_sink = SinkNode('RIGHT', self) + self.left_sink = SinkNode('left', self) + self.left_sink.start_location = (-0.3, 0, 0) + + self.right_sink = SinkNode('right', self) + self.right_sink.start_location = (0.3, 0, 0) self.sine_multiplier = TAU / self.sample_rate @@ -61,151 +37,57 @@ class Room: self.min_amp = -int((2**bit_depth) / 2.0) self.sample_width = int(self.bit_depth / 8.0) + def generate_frame(self, t): - def generate_frames(self, start_t_sec, end_t_sec): - - frames = [] - - start_t = start_t_sec * self.sample_rate - end_t = end_t_sec * self.sample_rate + for node in self.nodes: + node.tick_done = False + + if t%1000 == 0: + print(t, end='\r', flush=True) - for t in range(start_t, end_t): - - if self.running_program: - self.running_program.tick(t) + updates = True + while updates: + + updates = False + for node in self.nodes: - node.tick_done = False + + if not (t in node.r_amps): - if t%1000 == 0: - print(t) - - updates = True - - while updates: - - updates = False - - for node in self.nodes: + has_unprocessed_inputs = False - if not (t in node.amp_cache): - - has_unprocessed_inputs = False - - for in_node in node.air_in: + for in_node in node.air_in: + if not in_node.tick_done: + has_unprocessed_inputs = True + break + + if not has_unprocessed_inputs: + for in_node in node.wire_in: if not in_node.tick_done: has_unprocessed_inputs = True break + + if not has_unprocessed_inputs: - if not has_unprocessed_inputs: - for in_node in node.wire_in: - if not in_node.tick_done: - has_unprocessed_inputs = True - break - - if not has_unprocessed_inputs: - - node.fill_amp_cache(t) - node.tick_done = True - updates = True - - left_total_amp = self.max_amp * sum([vol for freq, vol in self.left_sink.amp_cache[t].items()]) - right_total_amp = self.max_amp * sum([vol for freq, vol in self.right_sink.amp_cache[t].items()]) - - if left_total_amp > self.max_amp: - left_total_amp = self.max_amp - - if left_total_amp < self.min_amp: - left_total_amp = self.min_amp - - if right_total_amp > self.max_amp: - right_total_amp = self.max_amp - - if right_total_amp < self.min_amp: - right_total_amp = self.min_amp - - frames.append(stb(int(left_total_amp), self.sample_width)) - frames.append(stb(int(right_total_amp), self.sample_width)) + node.calc_r_amps(t) + node.tick_done = True + updates = True - return frames - - def write_frames_to_wavefile(self, fn, frames): + left_total_amp = self.max_amp * sum([vol for freq, vol in self.left_sink.r_amps[t].items()]) + right_total_amp = self.max_amp * sum([vol for freq, vol in self.right_sink.r_amps[t].items()]) - with wave.open(fn, 'wb') as wav: + if left_total_amp > self.max_amp: + left_total_amp = self.max_amp - wav.setnchannels(2) - - wav.setsampwidth(self.sample_width) - wav.setframerate(self.sample_rate) - wav.writeframes(b''.join(frames)) - - def record(self, fn, start_t_sec, end_t_sec): + if left_total_amp < self.min_amp: + left_total_amp = self.min_amp - frames = self.generate_frames(start_t_sec, end_t_sec) - self.write_frames_to_wavefile(fn, frames) + if right_total_amp > self.max_amp: + right_total_amp = self.max_amp + + if right_total_amp < self.min_amp: + right_total_amp = self.min_amp + return stb(int(left_total_amp), self.sample_width), stb(int(right_total_amp), self.sample_width) - """ - def record(self, fn, start_t_sec, end_t_sec): - - with wave.open(fn, 'wb') as wav: - - wav.setnchannels(2) - - self.sample_width = int(self.bit_depth / 8.0) - wav.setsampwidth(self.sample_width) - wav.setframerate(self.sample_rate) - - frames = [] - - start_t = start_t_sec * self.sample_rate - end_t = end_t_sec * self.sample_rate - - for t in range(start_t, end_t): - - #№if t%1000 == 0: - # print(t) - f = self.lowest_freq - - left_total_amp = 0 - right_total_amp = 0 - - left_amps = [] - right_amps = [] - - while f < self.highest_freq: - - for in_node in self.left_sink.air_in: - left_amps.append(in_node.amp_at_tick_by_air(f, t, self.left_sink)) - - for in_node in self.left_sink.wire_in: - left_amps.append(in_node.amp_at_tick(f, t)) - - for in_node in self.right_sink.air_in: - right_amps.append(in_node.amp_at_tick_by_air(f, t, self.right_sink)) - - for in_node in self.right_sink.wire_in: - right_amps.append(in_node.amp_at_tick(f, t)) - - f += self.freq_sample_step - - left_total_amp = self.max_amp * sum(left_amps) - right_total_amp = self.max_amp * sum(right_amps) - - if left_total_amp > self.max_amp: - left_total_amp = self.max_amp - - if left_total_amp < self.min_amp: - left_total_amp = self.min_amp - - if right_total_amp > self.max_amp: - right_total_amp = self.max_amp - - if right_total_amp < self.min_amp: - right_total_amp = self.min_amp - - frames.append(stb(int(left_total_amp), self.sample_width)) - frames.append(stb(int(right_total_amp), self.sample_width)) - - wav.writeframes(b''.join(frames)) - """ \ No newline at end of file diff --git a/core/soundnode.py b/core/soundnode.py index 689602a..72e0831 100644 --- a/core/soundnode.py +++ b/core/soundnode.py @@ -9,22 +9,17 @@ class SoundNode: self.room = room self.room.nodes.append(self) - self.amp_cache = dict() # {tick: {freq: amp}} + self.r_amps = dict() # {tick: {freq: r_amp}} self.air_in = [] self.wire_in = [] self.start_location = (0, 0, 0) + # Used by the tick cycle to correctly order the nodes. + # Do not touch this. self.tick_done = False - def location(self, t): - - # Location of the soundnote (x,y,z) in meters - # at time t. - - return self.start_location - def distance_to_node(self, other_node, t): loc = self.location(t) @@ -32,49 +27,47 @@ class SoundNode: return (loc[0]-other_loc[0])**2 + (loc[1]-other_loc[1])**2 + (loc[2]-other_loc[2])**2 + def sample_r_amps_by_wire(self, source_node, current_t): - def fill_amp_cache(self, t): - - self.amp_cache[t] = dict() - - def sample_freqs_by_wire(self, source_node, current_t): - - if current_t in source_node.amp_cache: - return source_node.amp_cache[current_t] + if current_t in source_node.r_amps: + return source_node.r_amps[current_t] return dict() - - def sample_freqs_by_air(self, source_node, current_t): + + def sample_r_amps_by_air(self, source_node, current_t): dist = self.distance_to_node(source_node, current_t) sample_t = current_t - int(dist / self.room.speed_of_sound) - if sample_t in source_node.amp_cache: - return {f: a / float(dist) for f, a in source_node.amp_cache[sample_t].items()} + if sample_t in source_node.r_amps: + return {f: a / float(dist) for f, a in source_node.r_amps[sample_t].items()} return dict() - def add_air_output(self, out_node): + def air_to(self, out_node): if not self in out_node.air_in: out_node.air_in.append(self) - def add_wire_output(self, out_node): + def wire_to(self, out_node): if not self in out_node.wire_in: out_node.wire_in.append(self) - """ + ## THE FOLLOWING FUNCTIONS NEED TO BE DEFINED BY CONCRETE NODES + + def location(self, t): - def amp_at_tick(self, f, t): + # Location of the soundnote (x,y,z) in meters + # at time t. - return self.frequency_max_rel_amp(f, t) * math.sin(self.room.sine_multiplier * f * t) + return self.start_location + + def calc_r_amps(self, t): - def amp_at_tick_by_air(self, f, t, node): + # Fill the amp_cache for tick t + # {freq: relative_amp} + # relative_amp must be between 0 and 1 - dist = self.distance_to_node(node, t) - t = t - int(dist / self.room.speed_of_sound) - - return self.frequency_max_rel_amp(f, t) * math.sin(self.room.sine_multiplier * f * t) - """ + self.r_amps[t] = dict() \ No newline at end of file diff --git a/test.py b/test.py index cd8d428..453bc56 100644 --- a/test.py +++ b/test.py @@ -1,21 +1,42 @@ -from core.room import Room -from core.prorgam import Program -from core.soundnode import SoundNode -from core.nodes.sinenode import SineNode import math -R = Room() -R.left_sink.start_location = (-1, 0, 0) -R.right_sink.start_location = (2, 0, 0) +from core.room import Room +from core.program import Program +from core.soundnode import SoundNode +from core.nodes.sinenode import * -sn = SineNode([440, 440*2, 440*3, 440*4, 220], R) -sn2 = SineNode([523.25, 523.25*2, 523.25*3, 523.25*4, 220], R) +class TestProgram(Program): + + def __init__(self): + + super().__init__("testprogram") -sn.add_air_output(R.left_sink) -sn.add_air_output(R.right_sink) -sn2.add_air_output(R.left_sink) -sn2.add_air_output(R.right_sink) -sn2.start_location = (1, 0, 0) + def setup(self): + + self.reset() + + sn = SineNode([], self.room) + + sn.air_to(self.room.left_sink) + #sn.air_to(self.room.right_sink) + + NoteAction('A4', self.st(0), self.st(0.5), [sn], self) + NoteAction('G4', self.st(1), self.st(0.5), [sn], self) + NoteAction('F4', self.st(2), self.st(0.5), [sn], self) + NoteAction('E4', self.st(3), self.st(0.5), [sn], self) + + sn2 = SineNode([], self.room) + + #sn2.air_to(self.room.left_sink) + sn2.air_to(self.room.right_sink) + + NoteAction('A3', self.st(0), self.st(0.5), [sn2], self) + NoteAction('G3', self.st(1), self.st(0.5), [sn2], self) + NoteAction('F3', self.st(2), self.st(0.5), [sn2], self) + NoteAction('E3', self.st(3), self.st(0.5), [sn2], self) -R.record('test6.wav', 0, 2) \ No newline at end of file + +TP = TestProgram() +TP.setup() +TP.interface() \ No newline at end of file