From b1c0145e440b45acd2abee483f84a3816d0dae02 Mon Sep 17 00:00:00 2001 From: Max Luebbering <2804731+le1nux@users.noreply.github.com> Date: Fri, 17 Apr 2026 13:20:36 +0200 Subject: [PATCH 1/3] feat: added logging for chunk switches and initial positions in CombinedDataset --- src/modalities/config/config.py | 2 ++ src/modalities/dataloader/dataset.py | 11 ++++++++++- 2 files changed, 12 insertions(+), 1 deletion(-) diff --git a/src/modalities/config/config.py b/src/modalities/config/config.py index 13a37103b..060679582 100644 --- a/src/modalities/config/config.py +++ b/src/modalities/config/config.py @@ -448,6 +448,8 @@ class PackedMemMapDatasetMegatronConfig(BaseModel): class CombinedDatasetConfig(BaseModel): datasets: list[PydanticDatasetIFType] + log_chunk_switch: bool = False + log_initial_pos: bool = False class BatchSamplerConfig(BaseModel): diff --git a/src/modalities/dataloader/dataset.py b/src/modalities/dataloader/dataset.py index 0ef4d9076..a98df8b8f 100644 --- a/src/modalities/dataloader/dataset.py +++ b/src/modalities/dataloader/dataset.py @@ -14,6 +14,7 @@ from modalities.dataloader.create_packed_data import EmbeddedStreamData from modalities.dataloader.large_file_lines_reader import LargeFileLinesReader from modalities.tokenization.tokenizer_wrapper import TokenizerWrapper +from modalities.utils.logger_utils import get_logger class Dataset(TorchdataSet): @@ -445,14 +446,17 @@ class CombinedDataset(Dataset): In the Dataloader, a batch will still contain packed samples from different datasets. """ - def __init__(self, datasets: list[Dataset]): + def __init__(self, datasets: list[Dataset], log_chunk_switch: bool = False, log_initial_pos: bool = False): """Initializes the CombinedDataset object, combining multiple datasets. Args: datasets (list[Dataset]): A list of datasets to combine. """ + self.log_chunk_switch = log_chunk_switch + self.log_initial_pos = log_initial_pos self.datasets = datasets self.cumulative_sizes = np.cumsum([len(ds) for ds in datasets], dtype=np.int64) + self.logger = get_logger(__name__) def __len__(self) -> int: return self.cumulative_sizes[-1] @@ -460,5 +464,10 @@ def __len__(self) -> int: def __getitem__(self, idx: int) -> dict: dataset_idx = np.searchsorted(self.cumulative_sizes, idx, side="right") local_idx = idx - (self.cumulative_sizes[dataset_idx - 1] if dataset_idx > 0 else 0) + if self.log_chunk_switch and local_idx == 0: + self.logger.info(f"global_index={idx} chunk index={dataset_idx}, local index={local_idx}") + + if self.log_initial_pos: + self.logger.info(f"global_index={idx} chunk index={dataset_idx}, local index={local_idx}") return self.datasets[dataset_idx][local_idx] From 9189224c635798e750d97560df65b8135c538cf6 Mon Sep 17 00:00:00 2001 From: Max Luebbering <2804731+le1nux@users.noreply.github.com> Date: Fri, 17 Apr 2026 14:36:29 +0200 Subject: [PATCH 2/3] refactor: Added logging options to CombinedDataset and its factory method --- src/modalities/dataloader/dataset.py | 3 +++ src/modalities/dataloader/dataset_factory.py | 9 +++++++-- 2 files changed, 10 insertions(+), 2 deletions(-) diff --git a/src/modalities/dataloader/dataset.py b/src/modalities/dataloader/dataset.py index a98df8b8f..2d84f26fd 100644 --- a/src/modalities/dataloader/dataset.py +++ b/src/modalities/dataloader/dataset.py @@ -451,6 +451,9 @@ def __init__(self, datasets: list[Dataset], log_chunk_switch: bool = False, log_ Args: datasets (list[Dataset]): A list of datasets to combine. + log_chunk_switch (bool, optional): Whether to log when switching between dataset chunks. Defaults to False. + log_initial_pos (bool, optional): Whether to log the initial position of at the beginning of a training + or warmstart. Defaults to False. """ self.log_chunk_switch = log_chunk_switch self.log_initial_pos = log_initial_pos diff --git a/src/modalities/dataloader/dataset_factory.py b/src/modalities/dataloader/dataset_factory.py index 1eab9e328..8539c5d3a 100644 --- a/src/modalities/dataloader/dataset_factory.py +++ b/src/modalities/dataloader/dataset_factory.py @@ -116,13 +116,18 @@ def get_packed_mem_map_dataset_megatron( return dataset @staticmethod - def get_combined_dataset(datasets: list[Dataset]) -> Dataset: + def get_combined_dataset( + datasets: list[Dataset], log_chunk_switch: bool = False, log_initial_pos: bool = False + ) -> Dataset: """Factory method for creating a combined datset . Args: datasets (list[Dataset]): List of datasets to combine. + log_chunk_switch (bool, optional): Whether to log when switching between dataset chunks. Defaults to False. + log_initial_pos (bool, optional): Whether to log the initial position of at the beginning of a + training or warmstart. Defaults to False. Returns: Dataset: CombinedDataset object. """ - return CombinedDataset(datasets=datasets) + return CombinedDataset(datasets=datasets, log_chunk_switch=log_chunk_switch, log_initial_pos=log_initial_pos) From 864eae28aa41fb8aa0c2a25eae6a192e46481987 Mon Sep 17 00:00:00 2001 From: Max Luebbering <2804731+le1nux@users.noreply.github.com> Date: Fri, 17 Apr 2026 14:51:43 +0200 Subject: [PATCH 3/3] chore: improved logging in CombinedDataset for chunk switches and initial position --- src/modalities/dataloader/dataset.py | 14 +++++++++++--- 1 file changed, 11 insertions(+), 3 deletions(-) diff --git a/src/modalities/dataloader/dataset.py b/src/modalities/dataloader/dataset.py index 2d84f26fd..e4e26f02b 100644 --- a/src/modalities/dataloader/dataset.py +++ b/src/modalities/dataloader/dataset.py @@ -457,6 +457,7 @@ def __init__(self, datasets: list[Dataset], log_chunk_switch: bool = False, log_ """ self.log_chunk_switch = log_chunk_switch self.log_initial_pos = log_initial_pos + self.already_logged_initial_pos = False self.datasets = datasets self.cumulative_sizes = np.cumsum([len(ds) for ds in datasets], dtype=np.int64) self.logger = get_logger(__name__) @@ -468,9 +469,16 @@ def __getitem__(self, idx: int) -> dict: dataset_idx = np.searchsorted(self.cumulative_sizes, idx, side="right") local_idx = idx - (self.cumulative_sizes[dataset_idx - 1] if dataset_idx > 0 else 0) if self.log_chunk_switch and local_idx == 0: - self.logger.info(f"global_index={idx} chunk index={dataset_idx}, local index={local_idx}") + self.logger.info( + f"Chunk switch: global_index={idx}/{len(self)} chunk index={dataset_idx}/{len(self.datasets)}, " + f"local index={local_idx}/{len(self.datasets[dataset_idx])}" + ) - if self.log_initial_pos: - self.logger.info(f"global_index={idx} chunk index={dataset_idx}, local index={local_idx}") + if self.log_initial_pos and not self.already_logged_initial_pos: + self.logger.info( + f"Initial pos: global_index={idx}/{len(self)} chunk index={dataset_idx}/{len(self.datasets)}, " + f"local index={local_idx}/{len(self.datasets[dataset_idx])}" + ) + self.already_logged_initial_pos = True return self.datasets[dataset_idx][local_idx]