import torch import torch.nn as nn import torch.nn.functional as F import numpy as np from dataclasses import dataclass from transformers import Wav2Vec2Model, Wav2Vec2PreTrainedModel from transformers.modeling_outputs import BaseModelOutput from typing import Optional, Tuple from transformers.file_utils import ModelOutput from torch.nn import BCEWithLogitsLoss, CrossEntropyLoss, MSELoss _CONFIG_FOR_DOC = "Wav2Vec2Config" _HIDDEN_STATES_START_POSITION = 2 # the implementation of Wav2Vec2Model is borrowed from https://huggingface.co/transformers/_modules/transformers/models/wav2vec2/modeling_wav2vec2.html#Wav2Vec2Model # initialize our encoder with the pre-trained wav2vec 2.0 weights. def _compute_mask_indices( shape: Tuple[int, int], mask_prob: float, mask_length: int, attention_mask: Optional[torch.Tensor] = None, min_masks: int = 0, ) -> np.ndarray: bsz, all_sz = shape mask = np.full((bsz, all_sz), False) all_num_mask = int( mask_prob * all_sz / float(mask_length) + np.random.rand() ) all_num_mask = max(min_masks, all_num_mask) mask_idcs = [] padding_mask = attention_mask.ne(1) if attention_mask is not None else None for i in range(bsz): if padding_mask is not None: sz = all_sz - padding_mask[i].long().sum().item() num_mask = int( mask_prob * sz / float(mask_length) + np.random.rand() ) num_mask = max(min_masks, num_mask) else: sz = all_sz num_mask = all_num_mask lengths = np.full(num_mask, mask_length) if sum(lengths) == 0: lengths[0] = min(mask_length, sz - 1) min_len = min(lengths) if sz - min_len <= num_mask: min_len = sz - num_mask - 1 mask_idc = np.random.choice(sz - min_len, num_mask, replace=False) mask_idc = np.asarray([mask_idc[j] + offset for j in range(len(mask_idc)) for offset in range(lengths[j])]) mask_idcs.append(np.unique(mask_idc[mask_idc < sz])) min_len = min([len(m) for m in mask_idcs]) for i, mask_idc in enumerate(mask_idcs): if len(mask_idc) > min_len: mask_idc = np.random.choice(mask_idc, min_len, replace=False) mask[i, mask_idc] = True return mask # linear interpolation layer def linear_interpolation(features, input_fps, output_fps, output_len=None): features = features.transpose(1, 2) seq_len = features.shape[2] / float(input_fps) if output_len is None: output_len = int(seq_len * output_fps) output_features = F.interpolate(features, size=output_len, align_corners=True, mode='linear') return output_features.transpose(1, 2) class Wav2Vec2Model(Wav2Vec2Model): def __init__(self, config): super().__init__(config) self.lm_head = nn.Linear(1024, 32) def forward( self, input_values, attention_mask=None, output_attentions=None, output_hidden_states=None, return_dict=None, frame_num=None ): self.config.output_attentions = True output_attentions = output_attentions if output_attentions is not None else self.config.output_attentions output_hidden_states = ( output_hidden_states if output_hidden_states is not None else self.config.output_hidden_states ) return_dict = return_dict if return_dict is not None else self.config.use_return_dict hidden_states = self.feature_extractor(input_values) hidden_states = hidden_states.transpose(1, 2) hidden_states = linear_interpolation(hidden_states, 50, 30, output_len=frame_num) if attention_mask is not None: output_lengths = self._get_feat_extract_output_lengths(attention_mask.sum(-1)) attention_mask = torch.zeros( hidden_states.shape[:2], dtype=hidden_states.dtype, device=hidden_states.device ) attention_mask[ (torch.arange(attention_mask.shape[0], device=hidden_states.device), output_lengths - 1) ] = 1 attention_mask = attention_mask.flip([-1]).cumsum(-1).flip([-1]).bool() hidden_states = self.feature_projection(hidden_states)[0] encoder_outputs = self.encoder( hidden_states, attention_mask=attention_mask, output_attentions=output_attentions, output_hidden_states=output_hidden_states, return_dict=return_dict, ) hidden_states = encoder_outputs[0] if not return_dict: return (hidden_states,) + encoder_outputs[1:] return BaseModelOutput( last_hidden_state=hidden_states, hidden_states=encoder_outputs.hidden_states, attentions=encoder_outputs.attentions, ) @dataclass class SpeechClassifierOutput(ModelOutput): loss: Optional[torch.FloatTensor] = None logits: torch.FloatTensor = None hidden_states: Optional[Tuple[torch.FloatTensor]] = None attentions: Optional[Tuple[torch.FloatTensor]] = None class Wav2Vec2ClassificationHead(nn.Module): """Head for wav2vec classification task.""" def __init__(self, config): super().__init__() self.dense = nn.Linear(config.hidden_size, config.hidden_size) self.dropout = nn.Dropout(config.final_dropout) self.out_proj = nn.Linear(config.hidden_size, config.num_labels) def forward(self, features, **kwargs): x = features x = self.dropout(x) x = self.dense(x) x = torch.tanh(x) x = self.dropout(x) x = self.out_proj(x) return x class Wav2Vec2ForSpeechClassification(Wav2Vec2PreTrainedModel): def __init__(self, config): super().__init__(config) self.num_labels = config.num_labels self.pooling_mode = config.pooling_mode self.config = config self.wav2vec2 = Wav2Vec2Model(config) self.classifier = Wav2Vec2ClassificationHead(config) self.init_weights() def freeze_feature_extractor(self): self.wav2vec2.feature_extractor._freeze_parameters() def merged_strategy( self, hidden_states, mode="mean" ): if mode == "mean": outputs = torch.mean(hidden_states, dim=1) elif mode == "sum": outputs = torch.sum(hidden_states, dim=1) elif mode == "max": outputs = torch.max(hidden_states, dim=1)[0] else: raise Exception( "The pooling method hasn't been defined! Your pooling mode must be one of these ['mean', 'sum', 'max']") return outputs def forward( self, input_values, attention_mask=None, output_attentions=None, output_hidden_states=None, return_dict=None, labels=None, frame_num=None, ): return_dict = return_dict if return_dict is not None else self.config.use_return_dict outputs = self.wav2vec2( input_values, attention_mask=attention_mask, output_attentions=output_attentions, output_hidden_states=output_hidden_states, return_dict=return_dict, ) hidden_states = outputs[0] hidden_states1 = linear_interpolation(hidden_states, 50, 30, output_len=frame_num) hidden_states = self.merged_strategy(hidden_states1, mode=self.pooling_mode) logits = self.classifier(hidden_states) loss = None if labels is not None: if self.config.problem_type is None: if self.num_labels == 1: self.config.problem_type = "regression" elif self.num_labels > 1 and (labels.dtype == torch.long or labels.dtype == torch.int): self.config.problem_type = "single_label_classification" else: self.config.problem_type = "multi_label_classification" if self.config.problem_type == "regression": loss_fct = MSELoss() loss = loss_fct(logits.view(-1, self.num_labels), labels) elif self.config.problem_type == "single_label_classification": loss_fct = CrossEntropyLoss() loss = loss_fct(logits.view(-1, self.num_labels), labels.view(-1)) elif self.config.problem_type == "multi_label_classification": loss_fct = BCEWithLogitsLoss() loss = loss_fct(logits, labels) if not return_dict: output = (logits,) + outputs[2:] return ((loss,) + output) if loss is not None else output return SpeechClassifierOutput( loss=loss, logits=logits, hidden_states=hidden_states1, attentions=outputs.attentions, )