Source code for musif.extract.features.rhythm.handler

from collections import Counter, defaultdict
from itertools import groupby
from statistics import mean
from typing import List

import numpy as np
import pandas as pd

from musif.cache import hasattr
from musif.config import Configuration
from musif.extract.constants import DATA_PART_ABBREVIATION, GLOBAL_TIME_SIGNATURE
from musif.extract.features.core.constants import DATA_NOTES
from musif.extract.features.prefix import get_part_feature, get_score_feature
from musif.extract.utils import _get_beat_position
from musif.musicxml.tempo import get_number_of_beats

from .constants import *


[docs]def update_part_objects( score_data: dict, part_data: dict, cfg: Configuration, part_features: dict ): notes_duration = [note.duration.quarterLength for note in part_data["notes"]] beats = 1 beat_unit = 1 total_number_notes = 0 number_notes = 0 notes_dict = defaultdict(int) rhythm_intensity_period = [] rhythm_dot = 0 rhythm_double_dot = 0 total_beats = 0 total_sounding_beats = 0 beat_count = ( score_data[GLOBAL_TIME_SIGNATURE].beatCount if GLOBAL_TIME_SIGNATURE in score_data else 1 ) motion_features = get_motion_features(part_data) part_features.update(motion_features) for measure in part_data["measures"]: for i, element in enumerate(measure.elements): if element.classes[0] == "Note": number_notes += 1 notes_dict[element.duration.quarterLength] += 1 total_number_notes += 1 pos = _get_beat_position(beat_count, beats, element.beat) if ( element.duration.dots > 0 and element.duration.quarterLength < beat_unit ): if i + 1 < len(measure.elements): # check next item if ( measure.elements[i + 1].beatStr.split()[0] == element.beatStr.split()[0] ): if ( measure.elements[i + 1].duration.quarterLength < element.duration.quarterLength ): rhythm_dot += 1 if element.duration.dots == 2: rhythm_double_dot += 1 elif element.classes[0] == "TimeSignature": rhythm_intensity_period.append( sum([float(i) * j for i, j in Counter(notes_dict).items()]) / total_sounding_beats if total_sounding_beats != 0 else 0 ) number_notes = 0 beat_count = element.beatCount beats = get_number_of_beats(element.ratioString) beat_unit = element.beatStrength total_beats += beats if measure in part_data["sounding_measures"]: total_sounding_beats += beats rhythm_intensity_period.append( sum([j / i if i != 0.0 else 0 for i, j in Counter(notes_dict).items()]) / total_sounding_beats if total_sounding_beats else 0 ) notes_duration = [ i for i in notes_duration if i != 0.0 ] # remove notes with duration equal to 0 part_features.update( { AVERAGE_DURATION: mean(notes_duration) if len(notes_duration) != 0 else "NA", RHYTHMINT: sum(rhythm_intensity_period), DOTTEDRHYTHM: (rhythm_dot / total_sounding_beats) if total_sounding_beats else "NA", DOUBLE_DOTTEDRHYTHM: (rhythm_double_dot / total_beats), } )
[docs]def update_score_objects( score_data: dict, parts_data: List[dict], cfg: Configuration, parts_features: List[dict], score_features: dict, ): features = {} average_durations = [] rhythm_intensities = [] dotted_rhythm = [] double_dotted_rhythm = [] total_notes_duration = [] for part_data, part_features in zip(parts_data, parts_features): part = part_data[DATA_PART_ABBREVIATION] features[get_part_feature(part, AVERAGE_DURATION)] = part_features[ AVERAGE_DURATION ] total_notes_duration.append( [i.duration.quarterLength for i in part_data[DATA_NOTES] if i != 0] ) features[get_part_feature(part, RHYTHMINT)] = part_features[RHYTHMINT] # rhythm_intensities.append([1/i.duration.quarterLength for i in part_data[DATA_NOTES] if i !=0]) rhythm_intensities.append(part_features[RHYTHMINT]) features[get_part_feature(part, DOTTEDRHYTHM)] = part_features[DOTTEDRHYTHM] dotted_rhythm.append(part_features[DOTTEDRHYTHM]) features[get_part_feature(part, DOUBLE_DOTTEDRHYTHM)] = part_features[ DOUBLE_DOTTEDRHYTHM ] double_dotted_rhythm.append(part_features[DOUBLE_DOTTEDRHYTHM]) features[get_part_feature(part, SPEED_AVG_ABS)] = part_features[SPEED_AVG_ABS] features[get_part_feature(part, ACCELERATION_AVG_ABS)] = part_features[ ACCELERATION_AVG_ABS ] features[get_part_feature(part, ASCENDENT_AVERAGE)] = part_features[ ASCENDENT_AVERAGE ] features[get_part_feature(part, DESCENDENT_AVERAGE)] = part_features[ DESCENDENT_AVERAGE ] features[get_part_feature(part, ASCENDENT_PROPORTION)] = part_features[ ASCENDENT_PROPORTION ] features[get_part_feature(part, DESCENDENT_PROPORTION)] = part_features[ DESCENDENT_PROPORTION ] dotted_rhythm = [i for i in dotted_rhythm if i not in (0.0, "NA")] features.update( { # get_score_feature(AVERAGE_DURATION): mean(average_durations), get_score_feature(AVERAGE_DURATION): np.mean( [note for instrument in total_notes_duration for note in instrument] ), get_score_feature(RHYTHMINT): np.mean(rhythm_intensities) if rhythm_intensities else 0.0, # get_score_feature(RHYTHMINT): mean([rythm for instrument in rhythm_intensities for rythm in instrument])/score_features['Score_SoundingMeasures']*BEATS, get_score_feature(DOTTEDRHYTHM): np.mean(dotted_rhythm) if dotted_rhythm else 0.0, get_score_feature(DOUBLE_DOTTEDRHYTHM): np.mean(double_dotted_rhythm) if double_dotted_rhythm else 0.0, } ) score_features.update(features)
[docs]def get_motion_features(part_data) -> dict: notes_midi = [] notes_duration = [] for note in part_data["notes_and_rests"]: if hasattr(note, "pitch"): notes_midi.append(note.pitch.midi) notes_duration.append(note.duration.quarterLength) notes_midi = np.asarray(notes_midi) notes_duration = np.asarray(notes_duration) step = 0.125 midis_raw = np.repeat(notes_midi, [i / step for i in notes_duration], axis=0) spe_raw = np.diff(midis_raw) / step acc_raw = np.diff(spe_raw) / step # Absolute means of speed and acceleration spe_avg_abs = np.mean(abs(spe_raw)) acc_avg_abs = np.mean(abs(acc_raw)) # Rolling mean to smooth the midis by +-1 compasses -- not required for # statistics based on means but important for detecting increasing sequences # with a tolerance. measure = 4 midis_smo_series = pd.Series(midis_raw) midis_smo = [ np.mean(i.to_list()) for i in midis_smo_series.rolling(2 * measure + 1, center=True) ] # midis_smo = np.rollmean(midis_raw, k = 2 * compass + 1, align = "center") # spe_smo = np.diff(midis_smo) / step # acc_smo = np.diff(spe_smo) / step # Prolonged ascent/descent chunks in smoothed midis of the aria (allows for # small violations in the form of decrements/increments that do not # decrease/increase the rolling mean). dife = np.diff(midis_smo) asc = [(k, sum(1 for i in g)) for k, g in groupby(dife > 0)] dsc = [(k, sum(1 for i in g)) for k, g in groupby(dife < 0)] asc = [i for b, i in asc if b] dsc = [i for b, i in dsc if b] # Average length of ascent/descent chunks of the aria asc_avg = mean(asc) if asc else np.nan dsc_avg = mean(dsc) if dsc else np.nan # Proportion of ascent/descent chunks over the total of the aria asc_prp = sum(asc) / (len(dife) - 1) if asc else np.nan dsc_prp = sum(dsc) / (len(dife) - 1) if dsc else np.nan return { SPEED_AVG_ABS: spe_avg_abs, ACCELERATION_AVG_ABS: acc_avg_abs, ASCENDENT_AVERAGE: asc_avg, DESCENDENT_AVERAGE: dsc_avg, ASCENDENT_PROPORTION: asc_prp, DESCENDENT_PROPORTION: dsc_prp, }