206 lines
No EOL
4.9 KiB
Python
206 lines
No EOL
4.9 KiB
Python
import wave
|
|
import math
|
|
|
|
from .soundnode import *
|
|
|
|
TAU = 2 * math.pi
|
|
|
|
def stb(i: int, bytelen: int) -> bytes:
|
|
return i.to_bytes(bytelen, byteorder='little', signed=True)
|
|
|
|
class SinkNode(SoundNode):
|
|
|
|
def __init__(self, name, room):
|
|
|
|
super().__init__(name, room)
|
|
|
|
def fill_amp_cache(self, t):
|
|
# This function returns volumes of each relevant freq
|
|
# at tick t
|
|
|
|
res = dict()
|
|
|
|
for source_node in self.wire_in:
|
|
for freq, vol in self.sample_freqs_by_wire(source_node, t).items():
|
|
if not freq in res:
|
|
res[freq] = vol
|
|
else:
|
|
res[freq] += vol
|
|
|
|
for source_node in self.air_in:
|
|
for freq, vol in self.sample_freqs_by_air(source_node, t).items():
|
|
if not freq in res:
|
|
res[freq] = vol
|
|
else:
|
|
res[freq] += vol
|
|
|
|
self.amp_cache[t] = res
|
|
|
|
class Room:
|
|
|
|
def __init__(self):
|
|
|
|
self.nodes = []
|
|
|
|
self.dissipation_quotient = 0.9993
|
|
self.sample_rate = 44100
|
|
self.speed_of_sound = 343 / float(self.sample_rate) # m/tick
|
|
self.set_bit_depth(24)
|
|
|
|
self.left_sink = SinkNode('LEFT', self)
|
|
self.right_sink = SinkNode('RIGHT', self)
|
|
|
|
self.sine_multiplier = TAU / self.sample_rate
|
|
|
|
def set_bit_depth(self, bit_depth):
|
|
|
|
self.bit_depth = bit_depth
|
|
self.max_amp = int((2**bit_depth) / 2.0 - 1)
|
|
self.min_amp = -int((2**bit_depth) / 2.0)
|
|
self.sample_width = int(self.bit_depth / 8.0)
|
|
|
|
|
|
def generate_frames(self, start_t_sec, end_t_sec):
|
|
|
|
frames = []
|
|
|
|
start_t = start_t_sec * self.sample_rate
|
|
end_t = end_t_sec * self.sample_rate
|
|
|
|
for t in range(start_t, end_t):
|
|
|
|
for node in self.nodes:
|
|
node.tick_done = False
|
|
|
|
if t%1000 == 0:
|
|
print(t)
|
|
|
|
updates = True
|
|
|
|
while updates:
|
|
|
|
updates = False
|
|
|
|
for node in self.nodes:
|
|
|
|
if not (t in node.amp_cache):
|
|
|
|
has_unprocessed_inputs = False
|
|
|
|
for in_node in node.air_in:
|
|
if not in_node.tick_done:
|
|
has_unprocessed_inputs = True
|
|
break
|
|
|
|
if not has_unprocessed_inputs:
|
|
for in_node in node.wire_in:
|
|
if not in_node.tick_done:
|
|
has_unprocessed_inputs = True
|
|
break
|
|
|
|
if not has_unprocessed_inputs:
|
|
|
|
node.fill_amp_cache(t)
|
|
node.tick_done = True
|
|
updates = True
|
|
|
|
left_total_amp = self.max_amp * sum([vol for freq, vol in self.left_sink.amp_cache[t].items()])
|
|
right_total_amp = self.max_amp * sum([vol for freq, vol in self.right_sink.amp_cache[t].items()])
|
|
|
|
if left_total_amp > self.max_amp:
|
|
left_total_amp = self.max_amp
|
|
|
|
if left_total_amp < self.min_amp:
|
|
left_total_amp = self.min_amp
|
|
|
|
if right_total_amp > self.max_amp:
|
|
right_total_amp = self.max_amp
|
|
|
|
if right_total_amp < self.min_amp:
|
|
right_total_amp = self.min_amp
|
|
|
|
frames.append(stb(int(left_total_amp), self.sample_width))
|
|
frames.append(stb(int(right_total_amp), self.sample_width))
|
|
|
|
return frames
|
|
|
|
def write_frames_to_wavefile(self, fn, frames):
|
|
|
|
with wave.open(fn, 'wb') as wav:
|
|
|
|
wav.setnchannels(2)
|
|
|
|
wav.setsampwidth(self.sample_width)
|
|
wav.setframerate(self.sample_rate)
|
|
wav.writeframes(b''.join(frames))
|
|
|
|
def record(self, fn, start_t_sec, end_t_sec):
|
|
|
|
frames = self.generate_frames(start_t_sec, end_t_sec)
|
|
self.write_frames_to_wavefile(fn, frames)
|
|
|
|
|
|
"""
|
|
def record(self, fn, start_t_sec, end_t_sec):
|
|
|
|
with wave.open(fn, 'wb') as wav:
|
|
|
|
wav.setnchannels(2)
|
|
|
|
self.sample_width = int(self.bit_depth / 8.0)
|
|
wav.setsampwidth(self.sample_width)
|
|
wav.setframerate(self.sample_rate)
|
|
|
|
frames = []
|
|
|
|
start_t = start_t_sec * self.sample_rate
|
|
end_t = end_t_sec * self.sample_rate
|
|
|
|
for t in range(start_t, end_t):
|
|
|
|
#№if t%1000 == 0:
|
|
# print(t)
|
|
f = self.lowest_freq
|
|
|
|
left_total_amp = 0
|
|
right_total_amp = 0
|
|
|
|
left_amps = []
|
|
right_amps = []
|
|
|
|
while f < self.highest_freq:
|
|
|
|
for in_node in self.left_sink.air_in:
|
|
left_amps.append(in_node.amp_at_tick_by_air(f, t, self.left_sink))
|
|
|
|
for in_node in self.left_sink.wire_in:
|
|
left_amps.append(in_node.amp_at_tick(f, t))
|
|
|
|
for in_node in self.right_sink.air_in:
|
|
right_amps.append(in_node.amp_at_tick_by_air(f, t, self.right_sink))
|
|
|
|
for in_node in self.right_sink.wire_in:
|
|
right_amps.append(in_node.amp_at_tick(f, t))
|
|
|
|
f += self.freq_sample_step
|
|
|
|
left_total_amp = self.max_amp * sum(left_amps)
|
|
right_total_amp = self.max_amp * sum(right_amps)
|
|
|
|
if left_total_amp > self.max_amp:
|
|
left_total_amp = self.max_amp
|
|
|
|
if left_total_amp < self.min_amp:
|
|
left_total_amp = self.min_amp
|
|
|
|
if right_total_amp > self.max_amp:
|
|
right_total_amp = self.max_amp
|
|
|
|
if right_total_amp < self.min_amp:
|
|
right_total_amp = self.min_amp
|
|
|
|
frames.append(stb(int(left_total_amp), self.sample_width))
|
|
frames.append(stb(int(right_total_amp), self.sample_width))
|
|
|
|
wav.writeframes(b''.join(frames))
|
|
""" |