import itertools
import os
from typing import Union
import music21 as m21
import pandas as pd
import ms3
from pandas import DataFrame
from musif.extract.constants import PLAYTHROUGH
from musif.cache import isinstance
from musif.musicxml.tempo import get_number_of_beats
file_names = []
repeat_bracket = False
[docs]def process_musescore_file(file_path: str, expand_repeats: bool = False) -> DataFrame:
"""
Given a mscx file name, parses the file using ms3 library and returns a dataframe containing all harmonic information.
Adds Playthrough column that contains number of every measure in the cronological order
Parameters
----------
file_path: str
Path to mscx file
expand_repeats: bool
Directory path to musescore file
Returns
-------
harmonic_analysis: str
Dataframe containing harmonic information
"""
msc3_score = ms3.score.Score(file_path, logger_cfg={"level": "ERROR"})
harmonic_analysis = msc3_score.mscx.expanded
harmonic_analysis.reset_index(inplace=True)
if expand_repeats:
mn = ms3.parse.next2sequence(msc3_score.mscx.measures.set_index("mc").next)
mn = pd.Series(mn, name="mc_playthrough")
harmonic_analysis = ms3.parse.unfold_repeats(harmonic_analysis, mn)
harmonic_analysis.rename(columns={"mc_playthrough": PLAYTHROUGH}, inplace=True)
else:
if harmonic_analysis.mn[0] == 0:
harmonic_analysis[PLAYTHROUGH] = harmonic_analysis["mc"]
else:
harmonic_analysis[PLAYTHROUGH] = harmonic_analysis["mn"]
_include_beats_column(harmonic_analysis)
return harmonic_analysis
[docs]def expand_score_repetitions(score, repeat_elements: list):
"""
Given a music21 Score object and a list containing repetition elements, expands the score object and
places all measures in their correspondent cronological order
Parameters
----------
score: music21 Score
Score object parsed by music21
expand_repeats: list
List containing all repetition elements
Returns
-------
final_score: music21 Score
Score object with expanded repetitions
"""
score = _expand_repeat_bars(score)
final_score = m21.stream.Score()
final_score.metadata = score.metadata
if len(repeat_elements) > 0:
for part in score.parts:
p = _expand_part(part, repeat_elements)
final_score.insert(0, p)
else:
final_score = score
return final_score
def _measure_ranges(
instrument_measures: int,
init: int,
end: int,
iteration: int = None,
offset: int = None,
twocompasses_flag: bool = False,
remove_repetition_marks_flag: bool = False,
) -> list:
measures = []
o = offset
last_offset = (
0.0 if int(init) - 6 < 0 else instrument_measures[int(init) - 6].offset
)
init_index, end_index = _find_init_and_end_indexes(instrument_measures, init, end)
for i in range(init_index, end_index + 1):
if (
not i < 0
and i < len(instrument_measures)
and instrument_measures[i].measureNumber >= int(init)
and instrument_measures[i].measureNumber <= int(end)
):
if not twocompasses_flag:
compass = instrument_measures[i].quarterLength
m = m21.stream.Measure(number=instrument_measures[i].measureNumber)
if remove_repetition_marks_flag:
m.elements = [
e
for e in instrument_measures[i].elements
if not isinstance(e, m21.repeat.RepeatMark)
]
else:
m.elements = instrument_measures[i].elements
m.quarterLength = compass
if offset is None:
m.offset = last_offset
else:
m.offset = o
o += compass
measures.append(m)
if (
instrument_measures[i].measureNumber != 0.0
and instrument_measures[i].offset != 0
):
last_offset = instrument_measures[i].offset + compass
twocompasses_flag = False
if iteration == 2:
last_measure = instrument_measures[i + 1]
last_measure.offset = measures[-1].offset
measures = measures[:-1] + [last_measure]
return measures
def _find_init_and_end_indexes(
instrument_measures: list, init: int, end: int
) -> Union[int, int]:
init_index = instrument_measures.index(
[m for m in instrument_measures if m.measureNumber == init][0]
)
end_compass = [m for m in instrument_measures if m.measureNumber == end]
end_index = (
instrument_measures.index(end_compass[0])
if len(end_compass) > 0
else len(instrument_measures) - 1
)
return init_index, end_index
def _get_instrument_elements(part):
measures = []
for elem in part:
if isinstance(elem, m21.stream.Measure):
measures.append(elem)
# Change the note offsets to avoid problems due to the slurs
last_offset = 0
last_duration = 0
for note in elem:
if (
isinstance(note, m21.note.Note)
or isinstance(note, m21.note.Rest)
or isinstance(note, m21.chord.Chord)
):
note.offset = last_offset + last_duration
last_offset = note.offset
last_duration = note.duration.quarterLength
return measures
[docs]def get_repetition_elements(score, v=True):
global repeat_bracket
repeat_bracket = False
repeat_elements = set()
for instruments in score.parts:
instr_repeat_elements = []
for elem in instruments.elements:
if isinstance(elem, m21.stream.Measure):
for e in elem:
if isinstance(e, m21.repeat.RepeatMark) and not isinstance(
e, m21.bar.Repeat
):
measure = e.measureNumber
if elem.numberSuffix in ["X1", "X2"]: # Exception
measure += 1
instr_repeat_elements.append((measure, e.name))
elif isinstance(elem, m21.spanner.RepeatBracket):
repeat_bracket = True
string_e = str(elem)
index = string_e.find("music21.stream.Measure")
string_e = string_e[index:].replace("music21.stream.Measure ", "")
measure = (
string_e.split(" ")[0].strip().replace("X1", "").replace("X2", "")
)
instr_repeat_elements.append(
(int(measure), "repeat bracket" + elem.number)
)
repeat_elements.update(instr_repeat_elements)
repeat_elements = sorted(list(repeat_elements), key=lambda tup: tup[0])
if v:
print("The repeat elements found in this score are: " + str(repeat_elements))
return repeat_elements
def _include_beats_column(harmonic_analysis: DataFrame) -> None:
harmonic_analysis["beats"] = 0
for index, measure in enumerate(harmonic_analysis[PLAYTHROUGH].values):
if measure <= 1:
beat = int(
measure
+ float(harmonic_analysis.mc_onset[index])
* get_number_of_beats(harmonic_analysis.timesig[index])
)
else:
time_sig = get_number_of_beats(harmonic_analysis.timesig[index - 1])
beat = int(
(measure - 1) * time_sig
+ 1
+ harmonic_analysis.mc_onset[index]
* get_number_of_beats(harmonic_analysis.timesig[index])
)
harmonic_analysis.at[index, "beats"] = beat
def _get_beat_position(
beats_timesignature: int, number_of_beats: int, pos: int
) -> float:
if number_of_beats == beats_timesignature:
return pos
else:
return (pos / beats_timesignature) + 1
def _expand_repeat_bars(score):
final_score = m21.stream.Score()
final_score.metadata = score.metadata
exist_repetition_bars = False
# find repeat bars and expand
for instr in score.parts:
part_measures = _get_instrument_elements(
instr.elements
) # returns the measures with repetitions
last_measure = part_measures[-1].measureNumber
part_measures_expanded = []
startsin0 = part_measures[0].measureNumber == 0 # Everything should be -1
repetition_bars = []
# Find all repetitions in that instrument
for elem in instr.elements:
if isinstance(elem, m21.stream.Measure):
exist_repetition_bars = _examine_measure(repetition_bars, elem)
elif isinstance(elem, m21.spanner.RepeatBracket):
_examine_repeat_bracket(instr, repetition_bars, elem)
repetition_bars = sorted(list(repetition_bars), key=lambda tup: tup[0])
start = 0 if startsin0 else 1
_append_repetitions(
final_score,
exist_repetition_bars,
instr,
part_measures,
last_measure,
part_measures_expanded,
repetition_bars,
start,
)
return final_score if exist_repetition_bars else score
def _append_repetitions(
final_score,
exist_repetition_bars,
instr,
part_measures,
last_measure,
part_measures_expanded,
repetition_bars,
start,
):
if exist_repetition_bars:
p = m21.stream.Part()
p.id = instr.id
p.partName = instr.partName
for repetition_bar in repetition_bars:
measure = _measure_ranges(
part_measures, repetition_bar[0], repetition_bar[0]
)[0].quarterLength
if repetition_bar[1] == "start":
start = _add_start(
part_measures,
part_measures_expanded,
start,
repetition_bar,
measure,
)
elif repetition_bar[1] == "end":
start = _add_end(
part_measures,
part_measures_expanded,
repetition_bars,
start,
repetition_bar,
measure,
)
if start < last_measure:
measure = _measure_ranges(part_measures, start, start + 1)[0].quarterLength
offset = part_measures_expanded[-1][-1].offset
part_measures_expanded.append(
_measure_ranges(
part_measures, start, last_measure + 1, offset=offset + measure
)
)
p.elements = list(itertools.chain(*tuple(part_measures_expanded)))
final_score.insert(0, p)
def _add_end(
part_measures,
part_measures_expanded,
repetition_bars,
start,
repetition_bar,
measure,
):
if len(part_measures_expanded) > 0:
offset = part_measures_expanded[-1][-1].offset
else:
offset = 0
casilla_1 = (
True
if any(re[1] == "1" and re[0] <= repetition_bar[0] for re in repetition_bars)
else False
)
casilla_2 = None
if casilla_1:
casilla_2 = [
re for re in repetition_bars if re[1] == "2" and re[0] > repetition_bar[0]
]
casilla_2 = None if len(casilla_2) == 0 else casilla_2[0]
part_measures_expanded.append(
_measure_ranges(
part_measures,
init=start,
end=repetition_bar[0],
offset=offset + measure,
remove_repetition_marks_flag=True,
)
) # This should erase the repetition marks
if casilla_2 != None:
part_measures_expanded.append(
_measure_ranges(
part_measures,
start,
casilla_2[0],
iteration=2,
offset=part_measures_expanded[-1][-1].offset + measure,
)
)
start = casilla_2[0] + 1
else:
part_measures_expanded.append(
_measure_ranges(
part_measures,
init=start,
end=repetition_bar[0],
offset=part_measures_expanded[-1][-1].offset + measure,
)
)
start = repetition_bar[0] + 1
return start
def _add_start(part_measures, part_measures_expanded, start, repetition_bar, measure):
if len(part_measures_expanded) > 0:
offset = part_measures_expanded[-1][-1].offset
else:
offset = 0
start_measures = _measure_ranges(
part_measures, start, repetition_bar[0] - 1, offset=offset + measure
)
if len(start_measures) > 0:
part_measures_expanded.append(start_measures)
start = repetition_bar[0]
return start
def _examine_repeat_bracket(instr, repetition_bars, elem):
string_e = str(elem)
index = string_e.find("music21.stream.Measure")
measure = string_e[index:].replace("music21.stream.Measure", "")[1:3].strip()
repetition_bars.append((int(measure), elem.number))
index = instr.elements.index(elem)
elem.elements = instr.elements[:index] + instr.elements[index + 1 :]
def _examine_measure(repetition_bars, elem):
for e in elem:
if isinstance(e, m21.bar.Repeat):
exist_repetition_bars = True
if e.direction == "start":
repetition_bars.append((e.measureNumber, "start"))
elif e.direction == "end":
repetition_bars.append((e.measureNumber, "end"))
index = elem.elements.index(e)
elem.elements = elem.elements[:index] + elem.elements[index + 1 :]
return exist_repetition_bars
def _get_measures_list(part_measures: list, repeat_elements: list):
measures_list = []
startsin0 = part_measures[0].measureNumber == 0 # Everything should be -1
there_is_fine = False
there_is_segno = False
# 1. find the fine and segno
if any([r[1] == "fine" for r in repeat_elements]):
f = [x[0] for x in repeat_elements if x[1] == "fine"][0]
there_is_fine = True
if any([r[1] == "segno" for r in repeat_elements]):
s = [x[0] for x in repeat_elements if x[1] == "segno"][0]
there_is_segno = True
# Having all the repetition elements, get the measures
if there_is_segno:
before_segno = _measure_ranges(part_measures, 1 if not startsin0 else 0, s - 1)
measures_list.append(
before_segno
) # S -1 OR S-> when segno in compass 1, s, else s-1?
dc_time_signature = [
y
for x in before_segno
for y in x.elements
if isinstance(y, m21.meter.TimeSignature)
]
elif there_is_fine:
measures_list.append(
_measure_ranges(
part_measures,
1 if not startsin0 else 0,
f - 1,
iteration=1 if repeat_bracket else None,
)
)
else:
measures_list.append(
_measure_ranges(
part_measures,
1 if not startsin0 else 0,
len(part_measures),
iteration=1 if repeat_bracket else None,
)
)
for repeat in repeat_elements:
repeat_measure = _measure_ranges(part_measures, repeat[0], repeat[0])
compass = repeat_measure[0].quarterLength
if repeat[1] == "segno":
offset = measures_list[-1][-1].offset
segno_part = _measure_ranges(
part_measures,
s,
f - 1 if there_is_fine else len(part_measures),
iteration=1 if repeat_bracket else None,
offset=offset + compass,
remove_repetition_marks_flag=True,
)
measures_list.append(segno_part) # Segno to Fine
elif repeat[1] == "fine":
twoCompasses = False
"""if len(repeat_measure) > 0:
twoCompasses = True"""
offset = measures_list[-1][-1].offset
fine_part = _measure_ranges(
part_measures,
f,
len(part_measures),
offset=offset + compass,
twocompasses_flag=twoCompasses,
remove_repetition_marks_flag=True,
)
measures_list.append(fine_part) # Fine to end
elif repeat[1] == "al segno" or repeat[1] == "dal segno":
offset = measures_list[-1][-1].offset
# segnos' compass time signature
segno_time_measure = [
x
for x in segno_part[0].elements
if isinstance(x, m21.meter.TimeSignature)
]
segno_time_measure = (
segno_time_measure
if len(segno_time_measure) != 0
else dc_time_signature[-1]
)
alsegno_list = _measure_ranges(
part_measures,
s,
f - 1 if there_is_fine else len(part_measures),
iteration=2 if repeat_bracket else None,
offset=offset + compass,
remove_repetition_marks_flag=True,
)
if not any(
isinstance(x, m21.meter.TimeSignature) for x in alsegno_list[0].elements
):
# we reset the time signature that was on the dacapo
alsegno_list[0].elements = tuple(
[segno_time_measure] + list(alsegno_list[0].elements)
)
measures_list.append(alsegno_list) # Segno to fine
elif repeat[1] == "da capo":
offset = measures_list[-1][-1].offset
if startsin0 and there_is_fine and not repeat_bracket:
f += 1
dacapo_list = _measure_ranges(
part_measures,
0 if startsin0 else 1,
f - 1 if there_is_fine else len(part_measures),
iteration=2 if repeat_bracket else None,
offset=offset + compass,
remove_repetition_marks_flag=True,
)
measures_list.append(dacapo_list)
return measures_list
def _expand_part(part, repeat_elements: list):
part_measures = _get_instrument_elements(
part.elements
) # returns the measures with repetitions
p = m21.stream.Part()
p.id = part.id
p.partName = part.partName
part_measures_expanded = _get_measures_list(
part_measures, repeat_elements
) # returns the measures expanded
part_measures_expanded = list(itertools.chain(*part_measures_expanded))
# Assign a new continuous measure number to every measure
measure_number = 0 if part_measures_expanded[0].measureNumber == 0 else 1
for i, e in enumerate(part_measures_expanded):
m = m21.stream.Measure(number=measure_number)
m.elements = e.elements
m.offset = e.offset
m.quarterLength = e.quarterLength
part_measures_expanded[i] = m
measure_number += 1
p.elements = part_measures_expanded
return p
def _get_timesignature_periods(time_signatures: list):
# I don't get what this function should do
# TODO: Comprobar para cuando haya repeticiones, que al volver usa el beat del compas que toca.
periods = [0]
if len(time_signatures) == 0:
return periods
for t in range(0, len(time_signatures)):
if time_signatures[t] != time_signatures[t - 1]:
# when t is 0, t-1 is -1, is this what we want?
# if len(time_signatures) == 1, then it never enters here
periods.append(
t - periods[-1]
) # Substract indexes in case measures are not cointinuous
# if len(time_signatures) == 1, then we add 0 again
# if we entered the if, we add the same value twice
periods.append(t - periods[-1])
# at the end, periods is a list of indices of time_dignatures... but see next
# comment
return periods
def _calculate_total_number_of_beats(time_signatures: list, periods: list) -> int:
return sum(
[
# here, time_signature is indexed by j, and not by the content of period...
# is this correct? shouldn't thay be the opposite?
# when j == 0, period is also == 0, so it can be skipped
period * get_number_of_beats(time_signatures[period])
# j * get_number_of_beats(time_signatures[period])
for j, period in enumerate(periods)
]
)