import os
from collections import Counter
from pathlib import PurePath
from typing import Union
import pandas as pd
from pandas import DataFrame
from tqdm import tqdm
from musif.config import PostProcess_Configuration
from musif.extract.basic_modules.scoring.constants import INSTRUMENTATION
from musif.extract.constants import ID, WINDOW_ID
from musif.extract.features.core.constants import FILE_NAME
from musif.extract.features.harmony.constants import (
HARMONY_AVAILABLE,
KEY_MODULATORY,
KEY_PREFIX,
)
from musif.extract.features.prefix import get_part_prefix, get_sound_prefix
from musif.logs import perr, pinfo
from musif.process.constants import PRESENCE
from musif.process.utils import (
delete_columns,
join_keys,
join_keys_modulatory,
join_part_degrees,
)
[docs]class DataProcessor:
"""Processor class that treats columns and information of a DataFrame
This operator processes information from a DataFrame or a .csv file.
It deletes unseful columns for analysis and saves important ones.
Also saves data in several files in .csv format.
The main method .process() returns a DataFrame and saves data.
Requires to have a labels file in ./internal_data directory containing
each label assigned to each score.
...
Attributes
----------
data : DataFrame
DataFrame extracted with FeaturesExtractor containing all info.
info: str
Path to .csv file or Dataframe containing the information from FeaturesExtractor
Methods
-------
process_info(info=info: Union[str, DataFrame])
Reads info and returns a DataFrame
process()
Processes all the DataFrame information
group_columns()
Groups thos columns related to Keys, Key_Modulatory and Degree for agregated analysis
unbundle_instrumentation()
Separates 'Instrumentation' column into several Presence_ columns for every instrument present in Instrumentation.
delete_undesired_columns(**kwargs)
Deletes all columns that are not needed according to config.yml file
save(dest_path: str)
Saves final information to various csv files, splitting data, metadata and
features
"""
def __init__(self, info: Union[str, DataFrame], *args, **kwargs):
"""
Parameters
----------
*args: str
Could be a path to a .yml file, a PostProcess_Configuration object or a
dictionary. Length zero or one.
*kwargs : str
Key words arguments to construct
kwargs[info]: Union[str, DataFrame]
Either a path to a .csv file containing the information either a DataFrame
object fromm FeaturesExtractor
"""
self._post_config = PostProcess_Configuration(*args, **kwargs)
self.info = info
self.data = self._process_info(self.info)
self.internal_data_dir = self._post_config.internal_data
def _process_info(self, info: Union[str, DataFrame]) -> DataFrame:
"""
Extracts the info from a directory to a csv file or from a Dataframe object.
Parameters
------
info: str
Info in the from of str (path to csv file) or DataFrame
Raises
------
FileNotFoundError
If path to the .csv file is not found.
Returns
------
Dataframe contaning the information to be processed.
"""
try:
if isinstance(info, str) or isinstance(info, PurePath):
pinfo("\nReading csv file...")
if not os.path.exists(info):
raise FileNotFoundError("A .csv file could not be found")
if isinstance(info, PurePath):
self.destination_route = str(info.with_suffix(""))
else:
self.destination_route = info.replace(".csv", "")
df = pd.read_csv(
info, low_memory=False, sep=",", encoding_errors="replace"
)
if df.empty:
raise FileNotFoundError("The .csv file could not be found.")
return df
elif isinstance(info, DataFrame):
pinfo("\nProcessing DataFrame...")
return info
else:
perr(
"Wrong info type! You must introduce either a DataFrame either the name of a .csv file."
)
return pd.DataFrame()
except OSError as e:
perr(
f"Data could not be loaded. Either wrong path or an empty file was found. {e}"
)
return e
[docs] def process(self) -> DataFrame:
"""
Main method of the class. Removes NaN values, deletes unuseful columns
and merges those that are needed according to config.yml file.
Returns
------
Dataframe object
"""
pinfo("\nPreprocessing data...")
self.data.dropna(axis=1, how="all", inplace=True)
if self._post_config.delete_files_without_harmony:
self.delete_files_without_harmony()
if self._post_config.unbundle_instrumentation:
pinfo('\nSeparating "Instrumentation" column...')
self.unbundle_instrumentation()
self.delete_undesired_columns()
if self._post_config.grouped_analysis:
self.group_columns()
self._final_data_processing()
return self.data
[docs] def delete_files_without_harmony(self):
"""
Deletes files (actually rows in the DataFrame) that didn't have a proper
harmonic analysis and, there fore, got a value of 0 in 'Harmony_Available'
column
"""
if HARMONY_AVAILABLE in self.data:
number_files = len(self.data[self.data[HARMONY_AVAILABLE] == 0])
pinfo(
f"{number_files} files were found without mscx analysis or errors in harmonic analysis. They'll be deleted."
)
pinfo(
f"{self.data[self.data[HARMONY_AVAILABLE] == 0][FILE_NAME].to_string()}"
)
[docs] def group_columns(self) -> None:
"""
Groups Key_*_PercentageMeasures, Key_Modulatory and Degrees columns. Into bigger
groups for agregated analysis, keeping the previous ones. Also deletes
unnecesary columns for analysis.
"""
try:
self._group_keys_modulatory()
self._group_keys()
self._join_degrees()
self._join_degrees_relative()
except KeyError:
perr("Some columns to group could not be found.")
[docs] def unbundle_instrumentation(self) -> None:
"""
Separates Instrumentation column into as many columns as instruments present in
Instrumentation, assigning a value of 1 for every instrument that is present and
0 if it is not for every row (aria).
"""
for i, row in enumerate(self.data[INSTRUMENTATION]):
for element in row.split(","):
self.data.at[i, PRESENCE + "_" + element] = 1
self.data[[i for i in self.data if PRESENCE + "_" in i]] = (
self.data[[i for i in self.data if PRESENCE + "_" in i]]
.fillna(0)
.astype(int)
)
[docs] def delete_undesired_columns(self, **kwargs) -> None:
"""Deletes not necessary columns for statistical analysis.
If keyword arguments are passed in, they overwrite those found
into configurationg file
Parameters
----------
**kwargs : str, optional
Any value from config.yml can be overwritten by passing arguments
to the method
Raises
------
KeyError
If any of the columns required to delete is not found
in the original DataFrame.
"""
config_data = self._post_config.to_dict_post()
config_data.update(kwargs) # Override values
try:
delete_columns(self.data, config_data)
except KeyError:
perr("Some columns are already not present in the Dataframe")
[docs] def replace_nans(self) -> None:
for col in tqdm(
self.data.columns, desc="Replacing NaN values in selected columns"
):
if any(
substring in col for substring in tuple(self._post_config.replace_nans)
):
self.data[col] = self.data[col].fillna(0)
[docs] def save(self, dest_path: Union[str, PurePath], ft="csv") -> None:
"""Saves current information into a file given the name of dest_path
To load one of those file, remember to set the index to
`musif.extract.constant.ID`, and, if windows are used, to
`musif.extract.constant.WINDOW_ID`:
```python
df = pd.read_csv('window_alldata.csv').set_index(['Id', 'WindowId'])
```
Parameters
----------
dest_path : str or Path
Path to directory where the file will be stored; a suffix like
`_metadata.csv` will be added.
ft : str
Type of file for saving. The filetype must be supported by `pandas`, e.g.
`to_csv`, `to_feather`, `to_parquet`, etc.
"""
pinfo(f"Writing data to {dest_path}_*.csv")
ft = "to_" + ft
dest_path = str(dest_path)
getattr(self.data, ft)(dest_path + "_alldata.csv", index=False)
def _group_keys_modulatory(self) -> None:
self.data.update(
self.data[
[i for i in self.data.columns if KEY_PREFIX + KEY_MODULATORY in i]
].fillna(0)
)
join_keys_modulatory(self.data)
def _group_keys(self) -> None:
self.data.update(
self.data[[i for i in self.data.columns if KEY_PREFIX in i]].fillna(0)
)
join_keys(self.data)
def _join_degrees(self) -> None:
total_degrees = [
i for i in self.data.columns if "_Degree" in i and not "relative" in i
]
for part in self._post_config.instruments_to_keep:
join_part_degrees(total_degrees, get_part_prefix(part), self.data)
join_part_degrees(total_degrees, get_sound_prefix("voice"), self.data)
def _join_degrees_relative(self) -> None:
total_degrees = [
i for i in self.data.columns if "_Degree" in i and "relative" in i
]
for part in self._post_config.instruments_to_keep:
join_part_degrees(
total_degrees, get_part_prefix(part), self.data, sufix="_relative"
)
join_part_degrees(
total_degrees, get_sound_prefix("voice"), self.data, sufix="_relative"
)
def _final_data_processing(self) -> None:
self.data.sort_values([ID, WINDOW_ID], inplace=True)
self.replace_nans()
self.data = self._check_columns_type(self.data)
self.data = self.data.reindex(sorted(self.data.columns), axis=1)
self.data.drop("index", axis=1, inplace=True, errors="ignore")
def _check_columns_type(self, df) -> DataFrame:
for column in tqdm(df.columns, desc="Adjusting NaN values"):
column_type = Counter(df[df[column].notna()][column].map(type)).most_common(
1
)[0][0]
if column_type == str:
df[column] = df[column].replace(0, "0")
df[column] = df[column].fillna(str("NA"))
else:
df[column] = df[column].fillna(float("NaN"))
df[column] = df[column].replace("0", 0)
return df