75 lines
2.1 KiB
Python
75 lines
2.1 KiB
Python
import os
|
|
import torch
|
|
import librosa
|
|
from torch.utils.data import Dataset
|
|
import numpy as np
|
|
import random
|
|
|
|
|
|
HOP = 512
|
|
N_FFT = 1024
|
|
DURATION = 2.0
|
|
SR = 44100
|
|
|
|
|
|
def audio_to_logmag(audio):
|
|
# STFT
|
|
stft = librosa.stft(audio, n_fft=N_FFT, hop_length=HOP)
|
|
mag = np.abs(stft)
|
|
logmag = np.log1p(mag) # log(1 + x) for stability
|
|
return logmag # shape: (1, freq_bins, time_frames) = (1, 513, T)
|
|
|
|
|
|
class WaveformDataset(Dataset):
|
|
def __init__(self, lossy_dir, clean_dir, sr=SR, segment_sec=4):
|
|
self.cache = dict()
|
|
self.sr = sr
|
|
self.lossy_dir = lossy_dir
|
|
self.clean_dir = clean_dir
|
|
self.segment_len = int(segment_sec * sr)
|
|
self.lossy_files = sorted(os.listdir(lossy_dir))
|
|
self.clean_files = sorted(os.listdir(clean_dir))
|
|
self.file_pairs = [
|
|
(f, f) for f in self.lossy_files if f in set(self.clean_files)
|
|
]
|
|
|
|
def __len__(self):
|
|
return len(self.file_pairs)
|
|
|
|
def __getitem__(self, idx):
|
|
if idx in self.cache:
|
|
return self.cache[idx]
|
|
|
|
lossy_path = os.path.join(self.lossy_dir, self.lossy_files[idx])
|
|
clean_path = os.path.join(self.clean_dir, self.clean_files[idx])
|
|
|
|
# Load
|
|
lossy, _ = librosa.load(lossy_path, sr=self.sr, mono=True)
|
|
clean, _ = librosa.load(clean_path, sr=self.sr, mono=True)
|
|
|
|
# Match length
|
|
min_len = min(len(lossy), len(clean))
|
|
lossy, clean = lossy[:min_len], clean[:min_len]
|
|
|
|
# Random 2-second clip
|
|
|
|
clip_len = int(DURATION * SR)
|
|
if min_len < clip_len:
|
|
# pad if too short
|
|
lossy = np.pad(lossy, (0, clip_len - min_len))
|
|
clean = np.pad(clean, (0, clip_len - min_len))
|
|
start = 0
|
|
else:
|
|
start = random.randint(0, min_len - clip_len)
|
|
lossy = lossy[start : start + clip_len]
|
|
clean = clean[start : start + clip_len]
|
|
|
|
ans = (
|
|
torch.from_numpy(audio_to_logmag(lossy)).unsqueeze(0),
|
|
torch.from_numpy(audio_to_logmag(clean)).unsqueeze(0),
|
|
)
|
|
|
|
self.cache[idx] = ans
|
|
|
|
return ans
|