protocols 440Hz vs 432Hz

"""
================================================================================
PROTOCOLO COMPLETO DE ANÁLISIS DE ESTABILIDAD ACÚSTICA - 432 Hz
================================================================================

Basado en: "Quantum Sonorology: Sound Fission and the Test of 
            Biophysical Stability of Vibrating Matter at 432/440 Hz"
Autor: Julio Alberto Solis Trejo (Liah Steer)

Implementación: Análisis completo con generación de datos, visualizaciones
                y exportación de resultados en formato académico.

Licencia: CC BY 4.0
Autor de la implementación: Código libre para uso académico y científico
================================================================================
"""

import numpy as np
import matplotlib.pyplot as plt
from scipy import signal
from scipy.fft import fft, fftfreq
from scipy.stats import shapiro, levene, ttest_ind, ttest_rel
import pandas as pd
import seaborn as sns
from datetime import datetime
import json
import warnings
warnings.filterwarnings('ignore')

# Configuración de estilo para gráficos científicos
plt.style.use('seaborn-v0_8-darkgrid')
sns.set_palette("husl")

# ============================================================================
# CLASE PRINCIPAL: Analizador de Estabilidad Acústica
# ============================================================================

class AcousticStabilityAnalyzer:
    """
    Analizador completo de estabilidad acústica según protocolo FSE/FDT.
    
    Implementa los dos protocolos principales del estudio:
    1. FSE (Fission by Energy Saturation): Fisión por Saturación de Energía
    2. FDT (Fission by Temporal Desynchronization): Fisión por Desincronización
    """
    
    def __init__(self, sample_rate=48000, rms_db=-18.0):
        """
        Inicializa el analizador.
        
        Args:
            sample_rate: Tasa de muestreo en Hz (48000 Hz estándar broadcast)
            rms_db: Nivel RMS en dBFS (-18 dB = 18 dB headroom)
        """
        self.sample_rate = sample_rate
        self.rms_db = rms_db
        self.results = {}
        
        print("="*80)
        print(" ANALIZADOR DE ESTABILIDAD ACÚSTICA - PROTOCOLO COMPLETO")
        print("="*80)
        print(f" Sample Rate: {sample_rate} Hz")
        print(f" RMS Level: {rms_db} dBFS")
        print(f" Fecha: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
        print("="*80 + "\n")
    
    def generate_pure_tone(self, frequency, duration=5.0, apply_fade=True):
        """
        Genera tono sinusoidal puro según especificaciones del protocolo.
        
        Especificaciones técnicas (página 7 del documento):
        - Waveform: Pure sinusoidal (sin harmonics)
        - Precision: ±0.001 Hz
        - Initial THD: <0.01%
        - Fade-in/out: 2.0 s logarithmic curve
        
        Args:
            frequency: Frecuencia fundamental en Hz
            duration: Duración total en segundos
            apply_fade: Aplicar fade-in/out para prevenir clicks
        
        Returns:
            tuple: (audio_array, time_array)
        """
        t = np.linspace(0, duration, int(self.sample_rate * duration), endpoint=False)
        
        # Generación de onda sinusoidal pura
        audio = np.sin(2 * np.pi * frequency * t)
        
        # Normalización a nivel RMS especificado
        rms_linear = 10 ** (self.rms_db / 20)
        current_rms = np.sqrt(np.mean(audio**2))
        audio = audio * (rms_linear / current_rms)
        
        # Aplicar fade-in/out logarítmico (previene clicks de inicio/final)
        if apply_fade:
            fade_duration = 0.02 # 20 ms según protocolo simplificado
            fade_samples = int(fade_duration * self.sample_rate)
            
            # Curva logarítmica para fade natural
            fade_in = np.logspace(-3, 0, fade_samples)
            fade_in = fade_in / fade_in[-1] # Normalizar a 1
            
            fade_out = np.logspace(0, -3, fade_samples)
            fade_out = fade_out / fade_out[0]
            
            audio[:fade_samples] *= fade_in
            audio[-fade_samples:] *= fade_out
        
        return audio, t
    
    def calculate_thd(self, audio, fundamental_freq, n_harmonics=6):
        """
        Calcula Total Harmonic Distortion (THD) según protocolo.
        
        Fórmula (página 11 del documento):
        THD = √(H₂² + H₃² + H₄² + H₅² + H₆²) / H₁
        
        Umbral crítico: THD > 1.0% (0.5-1.5% límite de detección en oyentes)
        
        Args:
            audio: Señal de audio
            fundamental_freq: Frecuencia fundamental
            n_harmonics: Número de armónicos a considerar (default: 6)
        
        Returns:
            dict: {'thd_percent', 'harmonics', 'thd_db'}
        """
        N = len(audio)
        
        # FFT con ventana Hann (óptimo según página 9)
        window = np.hanning(N)
        fft_result = fft(audio * window)
        freqs = fftfreq(N, 1/self.sample_rate)
        
        # Magnitud espectral (solo frecuencias positivas)
        magnitude = np.abs(fft_result)[:N//2]
        freqs_positive = freqs[:N//2]
        
        # Extraer magnitudes de armónicos
        harmonics = []
        harmonic_freqs = []
        
        for h in range(1, n_harmonics + 1):
            target_freq = fundamental_freq * h
            
            # Búsqueda de pico en ventana de ±5 Hz
            search_window = 5.0
            mask = (freqs_positive >= target_freq - search_window) & \
                   (freqs_positive <= target_freq + search_window)
            
            if np.any(mask):
                idx = np.argmax(magnitude[mask])
                idx_global = np.where(mask)[0][idx]
                harmonics.append(magnitude[idx_global])
                harmonic_freqs.append(freqs_positive[idx_global])
            else:
                harmonics.append(0.0)
                harmonic_freqs.append(target_freq)
        
        # Cálculo de THD
        H1 = harmonics[0]
        if H1 > 0:
            sum_harmonics_squared = np.sum(np.array(harmonics[1:])**2)
            thd_ratio = np.sqrt(sum_harmonics_squared) / H1
            thd_percent = thd_ratio * 100
            thd_db = 20 * np.log10(thd_ratio) if thd_ratio > 0 else -np.inf
        else:
            thd_percent = 0.0
            thd_db = -np.inf
        
        return {
            'thd_percent': thd_percent,
            'thd_db': thd_db,
            'harmonics': harmonics,
            'harmonic_freqs': harmonic_freqs,
            'H1': H1
        }
    
    def temporal_correlation(self, audio, delay_ms=10):
        """
        Calcula correlación temporal con delay específico.
        
        Fórmula (página 12 del documento):
        r = Σ(x·y) / √(Σx² · Σy²)
        
        Umbral crítico: r < 0.70 (pérdida >50% de varianza compartida)
        Justificación: Basado en efecto de precedencia (Haas, 1972)
        
        Args:
            audio: Señal de audio
            delay_ms: Delay en milisegundos (default: 10 ms)
        
        Returns:
            float: Coeficiente de correlación (-1 a 1)
        """
        delay_samples = int((delay_ms / 1000) * self.sample_rate)
        
        if delay_samples >= len(audio):
            return 0.0
        
        x = audio[:-delay_samples]
        y = audio[delay_samples:]
        
        # Correlación de Pearson
        if len(x) > 0 and len(y) > 0:
            correlation_matrix = np.corrcoef(x, y)
            r = correlation_matrix[0, 1]
        else:
            r = 0.0
        
        return r
    
    def protocol_fse(self, frequency, n_replicas=10, verbose=True):
        """
        PROTOCOLO FSE: Fisión por Saturación de Energía
        
        Procedimiento (páginas 10-14 del documento):
        1. Generar tono puro a frecuencia especificada
        2. Incrementar ganancia sistemáticamente (pasos de 0.1 dB)
        3. Monitorear criterios duales:
           - Criterio A: THD > 1.0%
           - Criterio B: Correlación temporal < 0.70
        4. Registrar E_fission cuando AMBOS criterios se cumplan
        5. Repetir n_replicas veces con regeneración completa
        
        Args:
            frequency: Frecuencia a evaluar (Hz)
            n_replicas: Número de réplicas (default: 10)
            verbose: Mostrar progreso detallado
        
        Returns:
            dict: Resultados completos del protocolo
        """
        if verbose:
            print("\n" + "="*80)
            print(f" PROTOCOLO FSE - FISIÓN POR SATURACIÓN DE ENERGÍA")
            print(f" Frecuencia: {frequency} Hz | Réplicas: {n_replicas}")
            print("="*80 + "\n")
        
        e_fission_values = []
        thd_at_fission = []
        corr_at_fission = []
        
        for replica in range(n_replicas):
            # Generar tono puro (regeneración completa para evitar artefactos)
            audio, t = self.generate_pure_tone(frequency, duration=5.0)
            
            # Incremento sistemático de ganancia
            current_gain = 0.0
            step = 0.1 # dB por iteración
            max_gain = 25.0 # Límite de seguridad
            
            fission_found = False
            
            while current_gain < max_gain and not fission_found:
                # Aplicar ganancia en escala lineal
                gain_linear = 10 ** (current_gain / 20)
                audio_gained = audio * gain_linear
                
                # Brick-wall limiter a -0.1 dBFS (previene clipping destructivo)
                audio_gained = np.clip(audio_gained, -0.999, 0.999)
                
                # CRITERIO A: Total Harmonic Distortion
                thd_result = self.calculate_thd(audio_gained, frequency)
                thd = thd_result['thd_percent']
                
                # CRITERIO B: Correlación temporal
                corr = self.temporal_correlation(audio_gained, delay_ms=10)
                
                # Lógica de decisión: AMBOS criterios deben cumplirse
                if thd > 1.0 and corr < 0.70:
                    e_fission_values.append(current_gain)
                    thd_at_fission.append(thd)
                    corr_at_fission.append(corr)
                    fission_found = True
                    
                    if verbose:
                        print(f" Réplica {replica+1:2d}: E_fission = {current_gain:5.2f} dB "
                              f"| THD = {thd:6.3f}% | r = {corr:5.3f}")
                
                current_gain += step
            
            if not fission_found and verbose:
                print(f" Réplica {replica+1:2d}: NO alcanzó umbral (límite {max_gain} dB)")
        
        # Cálculo de estadísticas descriptivas
        if len(e_fission_values) > 0:
            results = {
                'frequency': frequency,
                'protocol': 'FSE',
                'n_replicas': len(e_fission_values),
                'e_fission_values': e_fission_values,
                'thd_at_fission': thd_at_fission,
                'corr_at_fission': corr_at_fission,
                'mean': np.mean(e_fission_values),
                'std': np.std(e_fission_values, ddof=1),
                'median': np.median(e_fission_values),
                'min': np.min(e_fission_values),
                'max': np.max(e_fission_values),
                'cv': (np.std(e_fission_values, ddof=1) / np.mean(e_fission_values)) * 100
            }
        else:
            results = {
                'frequency': frequency,
                'protocol': 'FSE',
                'n_replicas': 0,
                'mean': 0.0,
                'std': 0.0,
                'error': 'No se alcanzó criterio de fisión'
            }
        
        if verbose:
            print("\n" + "─"*80)
            print(f" ESTADÍSTICAS FSE - {frequency} Hz")
            print("─"*80)
            print(f" Media (M): {results['mean']:.3f} dB")
            print(f" Desv. Est. (SD): {results['std']:.3f} dB")
            print(f" Mediana: {results['median']:.3f} dB")
            print(f" Rango: [{results['min']:.2f}, {results['max']:.2f}] dB")
            print(f" Coef. Var. (CV): {results['cv']:.2f}%")
            print("─"*80 + "\n")
        
        return results
    
    def calculate_msc(self, track_a, track_b, target_freq, 
                     nperseg=4096, noverlap=2048):
        """
        Calcula Magnitude-Squared Coherence en frecuencia específica.
        
        Fórmula (página 20 del documento):
        MSC(f) = |Pxy(f)|² / (Pxx(f) · Pyy(f))
        
        Donde:
        - Pxy(f) = Cross-Spectral Density
        - Pxx(f), Pyy(f) = Auto-Spectral Densities
        
        Propiedades:
        - Rango: 0 ≤ MSC ≤ 1
        - MSC = 1: Coherencia perfecta
        - MSC < 0.5: Pérdida >50% de correlación espectral
        
        Args:
            track_a, track_b: Señales de audio
            target_freq: Frecuencia objetivo
            nperseg: Longitud de segmento (4096 = resolución ~11.7 Hz @ 48kHz)
            noverlap: Superposición (50% reduce varianza)
        
        Returns:
            tuple: (msc_value, frequencies, coherence_full)
        """
        # Método de Welch para estimación espectral robusta
        frequencies, coherence = signal.coherence(
            track_a, track_b,
            fs=self.sample_rate,
            window='hann', # Ventana Hann (óptimo compromiso)
            nperseg=nperseg,
            noverlap=noverlap,
            nfft=nperseg
        )
        
        # Buscar índice más cercano a frecuencia objetivo
        idx = np.argmin(np.abs(frequencies - target_freq))
        msc_value = coherence[idx]
        actual_freq = frequencies[idx]
        
        # Verificación de calidad
        freq_error = abs(actual_freq - target_freq)
        if freq_error > 5.0:
            print(f" ADVERTENCIA: Frecuencia analizada ({actual_freq:.2f} Hz) "
                  f"difiere de objetivo ({target_freq} Hz) en {freq_error:.2f} Hz")
        
        return msc_value, frequencies, coherence
    
    def protocol_fdt(self, frequency, n_replicas=10, verbose=True):
        """
        PROTOCOLO FDT: Fisión por Desincronización Temporal
        
        Procedimiento (páginas 16-22 del documento):
        1. Crear dos pistas idénticas del tono
        2. Aplicar offset temporal variable (Δt) a Pista B
        3. Calcular MSC (Magnitude-Squared Coherence)
        4. Buscar Δt que reduce MSC < 0.50 (umbral de fisión)
        5. Usar búsqueda binaria adaptativa para convergencia eficiente
        
        Fundamento teórico (efecto Haas):
        - Δt < 10-12 ms: Fusión perceptual (sonido único)
        - Δt > 20-30 ms: Eco discreto (dos eventos separados)
        - Δt ~ 12-15 ms: Zona de transición (fisión)
        
        Args:
            frequency: Frecuencia a evaluar (Hz)
            n_replicas: Número de réplicas
            verbose: Mostrar progreso
        
        Returns:
            dict: Resultados completos del protocolo
        """
        if verbose:
            print("\n" + "="*80)
            print(f" PROTOCOLO FDT - FISIÓN POR DESINCRONIZACIÓN TEMPORAL")
            print(f" Frecuencia: {frequency} Hz | Réplicas: {n_replicas}")
            print("="*80 + "\n")
        
        t_delta_values = []
        msc_at_threshold = []
        
        for replica in range(n_replicas):
            # Generar dos pistas idénticas
            track_a, _ = self.generate_pure_tone(frequency, duration=5.0)
            track_b, _ = self.generate_pure_tone(frequency, duration=5.0)
            
            # Ajuste de nivel para suma (-21 dBFS por pista → -18 dBFS suma)
            # 10^(-3/20) ≈ 0.708
            adjustment = 10 ** (-3 / 20)
            track_a = track_a * adjustment
            track_b = track_b * adjustment
            
            # Búsqueda binaria adaptativa
            dt_current = 10.0 # ms inicial (cerca de umbral Haas)
            step = 2.0 # paso inicial
            tolerance = 0.1 # precisión submilisegundo
            max_iterations = 20
            
            convergence_found = False
            
            for iteration in range(max_iterations):
                # Calcular offset en muestras
                offset_samples = int((dt_current / 1000) * self.sample_rate)
                
                # Validación de límites
                if offset_samples >= len(track_b):
                    dt_current -= step
                    step /= 2
                    continue
                
                # Crear Pista B con offset temporal
                track_b_offset = np.zeros_like(track_a)
                if offset_samples > 0:
                    track_b_offset[offset_samples:] = track_b[:-offset_samples]
                else:
                    track_b_offset = track_b.copy()
                
                # Calcular MSC en frecuencia fundamental
                msc, _, _ = self.calculate_msc(track_a, track_b_offset, frequency)
                
                # Criterio de convergencia: MSC cerca de 0.50
                if abs(msc - 0.50) < 0.02:
                    t_delta_values.append(dt_current)
                    msc_at_threshold.append(msc)
                    convergence_found = True
                    
                    if verbose:
                        print(f" Réplica {replica+1:2d}: T_Δ = {dt_current:6.2f} ms "
                              f"| MSC = {msc:.4f}")
                    break
                
                # Ajuste de búsqueda binaria
                if msc >= 0.50:
                    # Coherencia alta → aumentar delay
                    dt_current += step
                else:
                    # Coherencia baja → reducir delay
                    dt_current -= step / 2
                    step /= 2
                
                # Verificar tolerancia
                if step < tolerance:
                    t_delta_values.append(dt_current)
                    msc_at_threshold.append(msc)
                    convergence_found = True
                    
                    if verbose:
                        print(f" Réplica {replica+1:2d}: T_Δ = {dt_current:6.2f} ms "
                              f"(convergencia, MSC={msc:.4f})")
                    break
            
            if not convergence_found and verbose:
                print(f" Réplica {replica+1:2d}: NO convergió en {max_iterations} iteraciones")
        
        # Estadísticas descriptivas
        if len(t_delta_values) > 0:
            results = {
                'frequency': frequency,
                'protocol': 'FDT',
                'n_replicas': len(t_delta_values),
                't_delta_values': t_delta_values,
                'msc_at_threshold': msc_at_threshold,
                'mean': np.mean(t_delta_values),
                'std': np.std(t_delta_values, ddof=1),
                'median': np.median(t_delta_values),
                'min': np.min(t_delta_values),
                'max': np.max(t_delta_values),
                'cv': (np.std(t_delta_values, ddof=1) / np.mean(t_delta_values)) * 100
            }
        else:
            results = {
                'frequency': frequency,
                'protocol': 'FDT',
                'n_replicas': 0,
                'mean': 0.0,
                'std': 0.0,
                'error': 'No se alcanzó convergencia'
            }
        
        if verbose:
            print("\n" + "─"*80)
            print(f" ESTADÍSTICAS FDT - {frequency} Hz")
            print("─"*80)
            print(f" Media (M): {results['mean']:.3f} ms")
            print(f" Desv. Est. (SD): {results['std']:.3f} ms")
            print(f" Mediana: {results['median']:.3f} ms")
            print(f" Rango: [{results['min']:.2f}, {results['max']:.2f}] ms")
            print(f" Coef. Var. (CV): {results['cv']:.2f}%")
            print("─"*80 + "\n")
        
        return results
    
    def run_complete_analysis(self, frequency=432, n_replicas=10):
        """
        Ejecuta análisis completo: FSE + FDT
        
        Args:
            frequency: Frecuencia a analizar (default: 432 Hz)
            n_replicas: Número de réplicas por protocolo
        
        Returns:
            dict: Resultados completos de ambos protocolos
        """
        print("\n" + "╔" + "="*78 + "╗")
        print(f"║ ANÁLISIS COMPLETO DE ESTABILIDAD ACÚSTICA - {frequency} Hz" + " "*(78-len(f" ANÁLISIS COMPLETO DE ESTABILIDAD ACÚSTICA - {frequency} Hz")) + "║")
        print("╚" + "="*78 + "╝")
        
        # Protocolo FSE
        results_fse = self.protocol_fse(frequency, n_replicas, verbose=True)
        
        # Protocolo FDT
        results_fdt = self.protocol_fdt(frequency, n_replicas, verbose=True)
        
        # Consolidar resultados
        complete_results = {
            'frequency': frequency,
            'timestamp': datetime.now().isoformat(),
            'FSE': results_fse,
            'FDT': results_fdt,
            'summary': {
                'E_fission_mean': results_fse['mean'],
                'E_fission_std': results_fse['std'],
                'T_delta_mean': results_fdt['mean'],
                'T_delta_std': results_fdt['std']
            }
        }
        
        self.results[frequency] = complete_results
        
        return complete_results
    
    def visualize_results(self, frequency=432, save_fig=False, filename=None):
        """
        Genera visualizaciones científicas de los resultados.
        
        Args:
            frequency: Frecuencia a visualizar
            save_fig: Guardar figura como archivo
            filename: Nombre del archivo (opcional)
        """
        if frequency not in self.results:
            print(f"Error: No hay resultados para {frequency} Hz")
            return
        
        results = self.results[frequency]
        
        fig = plt.figure(figsize=(16, 10))
        gs = fig.add_gridspec(3, 3, hspace=0.3, wspace=0.3)
        
        # Título principal
        fig.suptitle(f'Análisis de Estabilidad Acústica - {frequency} Hz\n'
                     f'Protocolos FSE y FDT',
                     fontsize=16, fontweight='bold', y=0.98)
        
        # 1. Distribución E_fission
        ax1 = fig.add_subplot(gs[0, 0])
        if 'e_fission_values' in results['FSE']:
            values_fse = results['FSE']['e_fission_values']
            ax1.hist(values_fse, bins=8, edgecolor='black', alpha=0.7, color='steelblue')
            ax1.axvline(results['FSE']['mean'], color='red', linestyle='--', 
                       linewidth=2, label=f"Media: {results['FSE']['mean']:.2f} dB")
            ax1.set_xlabel('E_fission (dB)', fontsize=10)
            ax1.set_ylabel('Frecuencia', fontsize=10)
            ax1.set_title('Distribución de Energía de Fisión', fontsize=11, fontweight='bold')
            ax1.legend()
            ax1.grid(axis='y', alpha=0.3)
        
        # 2. Distribución T_delta
        ax2 = fig.add_subplot(gs[0, 1])
        if 't_delta_values' in results['FDT']:
            values_fdt = results['FDT']['t_delta_values']
            ax2.hist(values_fdt, bins=8, edgecolor='black', alpha=0.7, color='coral')
            ax2.axvline(results['FDT']['mean'], color='darkred', linestyle='--',
                       linewidth=2, label=f"Media: {results['FDT']['mean']:.2f} ms")
            ax2.set_xlabel('T_Δ (ms)', fontsize=10)
            ax2.set_ylabel('Frecuencia', fontsize=10)
            ax2.set_title('Distribución de Umbral Temporal', fontsize=11, fontweight='bold')
            ax2.legend()
            ax2.grid(axis='y', alpha=0.3)
        
        # 3. Boxplot comparativo
        ax3 = fig.add_subplot(gs[0, 2])
        if 'e_fission_values' in results['FSE'] and 't_delta_values' in results['FDT']:
            # Normalizar para comparación visual
            fse_norm = np.array(values_fse) / results['FSE']['mean'] * 100
            fdt_norm = np.array(values_fdt) / results['FDT']['mean'] * 100
            
            bp = ax3.boxplot([fse_norm, fdt_norm], 
                            labels=['E_fission', 'T_Δ'],
                            patch_artist=True,
                            notch=True)
            
            colors = ['lightblue', 'lightcoral']
            for patch, color in zip(bp['boxes'], colors):
                patch.set_facecolor(color)
            
            ax3.set_ylabel('Valor Normalizado (%)', fontsize=10)
            ax3.set_title('Variabilidad Relativa', fontsize=11, fontweight='bold')
            ax3.grid(axis='y', alpha=0.3)
        
        # 4. Serie temporal FSE
        ax4 = fig.add_subplot(gs[1, :])
        if 'e_fission_values' in results['FSE']:
            replicas = range(1, len(values_fse) + 1)
            ax4.plot(replicas, values_fse, 'o-', color='steelblue', 
                    linewidth=2, markersize=8, label='E_fission')
            ax4.axhline(results['FSE']['mean'], color='red', linestyle='--', 
                       alpha=0.7, label='Media')
            ax4.fill_between(replicas, 
                           results['FSE']['mean'] - results['FSE']['std'],
                           results['FSE']['mean'] + results['FSE']['std'],
                           alpha=0.2, color='red', label='±1 SD')
            ax4.set_xlabel('Número de Réplica', fontsize=10)
            ax4.set_ylabel('E_fission (dB)', fontsize=10)
            ax4.set_title('Serie Temporal - Protocolo FSE', fontsize=11, fontweight='bold')
            ax4.legend()
            ax4.grid(True, alpha=0.3)

Entradas más populares de este blog

grimorios

Firewall cognitivo.

La Crisis Epistémica: Cómo la Optimización para la Satisfacción del Usuario en Grandes Modelos de Lenguaje Crea Infraestructura Sistemática Post-Verdad