import datetime import json import time import os import warnings from collections import defaultdict, Counter from logging.config import valid_ident from traceback import print_tb from typing import Dict, List, Any, Tuple, Union from pathlib import Path from modules.civit.client import Client class DatamodelBuilderSimple: def __init__(self): self.field_analysis = {} self.field_analysis_low_ram: dict[str, int] = dict() @staticmethod def _get_json_files(directory_path: str) -> List[str]: """Получает список всех JSON файлов в директории""" json_files = [] for filename in os.listdir(directory_path): if filename.endswith('.json'): json_files.append(os.path.join(directory_path, filename)) return json_files @staticmethod def _load_json_data(file_path: str) -> List[Dict]: """Загружает данные из JSON файла""" try: with open(file_path, 'r', encoding='utf-8') as f: data = json.load(f) if isinstance(data, list): return data else: return [data] except (json.JSONDecodeError, IOError) as e: print(f"Ошибка чтения файла {file_path}: {e}") return [] def _collect_all_entities(self, directory_path: str) -> List[Dict]: """Собирает все экземпляры из всех JSON файлов""" all_entities = [] json_files = self._get_json_files(directory_path) for file_path in json_files: entities = self._load_json_data(file_path) all_entities.extend(entities) return all_entities @staticmethod def _get_field_types(value: Any) -> str: """Определяет тип значения""" if isinstance(value, dict): return 'dict' elif isinstance(value, list): return 'list' elif isinstance(value, bool): return 'bool' elif isinstance(value, int): return 'int' elif isinstance(value, float): return 'float' elif isinstance(value, str): return 'str' else: return 'unknown' @staticmethod def _get_main_type(types: List[str]) -> str: """Определяет основной тип из списка типов""" if not types: return 'unknown' # Если есть dict или list - это сложная структура if 'dict' in types or 'list' in types: return 'complex' # Иначе возвращаем первый тип (или объединяем) unique_types = set(types) if len(unique_types) == 1: return types[0] else: return 'mixed' @staticmethod def _is_hashable(value: Any) -> bool: """Проверяет, является ли значение хэшируемым""" try: hash(value) return True except TypeError: return False @classmethod def _serialize_value_for_counter(cls, value: Any) -> str: """Преобразует значение в строку для использования в Counter""" if cls._is_hashable(value): return value else: # Для нехэшируемых типов используем строковое представление return str(value) def _analyze_fields_recursive(self, entity: Dict, parent_path: str, field_types: Dict, field_presence: Dict, field_values: Dict, top_n: int): """Рекурсивно анализирует поля сущности""" if not isinstance(entity, dict): return for key, value in entity.items(): field_path = f"{parent_path}.{key}" if parent_path else key # Добавляем тип поля field_types[field_path].append(self._get_field_types(value)) # Отмечаем наличие поля field_presence[field_path].append(True) # Сохраняем значение для подсчета частоты (обрабатываем нехэшируемые типы) if value is not None: serialized_value = self._serialize_value_for_counter(value) field_values[field_path].append(serialized_value) # Рекурсивно анализируем вложенные структуры if isinstance(value, dict): self._analyze_fields_recursive(value, field_path, field_types, field_presence, field_values, top_n) elif isinstance(value, list): for item in value: if isinstance(item, dict): self._analyze_fields_recursive(item, field_path, field_types, field_presence, field_values, top_n) def _analyze_entity_structure(self, entities: List[Dict], top_n: int) -> Dict[str, Any]: """Анализирует структуру всех сущностей""" if not entities: return {} # Собираем все поля и их типы field_types = defaultdict(list) field_presence = defaultdict(list) field_values = defaultdict(list) for entity in entities: self._analyze_fields_recursive(entity, "", field_types, field_presence, field_values, top_n) # Формируем финальный анализ result = {} for field_path, types in field_types.items(): # Определяем основной тип main_type = self._get_main_type(types) # Подсчитываем частоту наличия поля presence_count = len(field_presence[field_path]) total_count = len(entities) always_present = presence_count == total_count # Получаем топ N значений top_values = [] if field_path in field_values: try: # Преобразуем строки обратно в оригинальные типы для отображения value_counter = Counter(field_values[field_path]) top_values = [item[0] for item in value_counter.most_common(top_n)] except Exception: # Если возникла ошибка, используем пустой список top_values = [] result[field_path] = { 'type': main_type, 'always_present': always_present, 'top_values': top_values, 'total_count': total_count, 'presence_count': presence_count } return result def analyze_directory(self, directory_path: str, top_n: int = 10) -> Dict[str, Any]: """ Основной метод анализа директории Args: directory_path: Путь к директории с JSON файлами top_n: Количество самых частых значений для каждого поля Returns: Словарь с анализом структуры данных """ # Шаг 1: Собираем все экземпляры из JSON файлов entities = self._collect_all_entities(directory_path) # Шаг 2: Анализируем структуру сущностей self.field_analysis = self._analyze_entity_structure(entities, top_n) return self.field_analysis def analyze_directory_low_ram(self, directory_path: str, dump = None): json_files = self._get_json_files(directory_path) i = 0 files_count = len(json_files) for file_path in json_files: i += 1 print(f'processing file {i} of {files_count}: {file_path}') entities = self._load_json_data(file_path) for entity in entities: self.analyze_recursive_low_ram(entity) # del entity, entities sorted_items = sorted(self.field_analysis_low_ram.items(), key=lambda item: item[1]) result = [f'{item[0]} => {item[1]}' for item in sorted_items] if dump: with open(dump, 'w') as f: for res in result: f.write(res + '\n') for res in result: print(res) def analyze_recursive_low_ram(self, entity: dict, prefix = ''): for key, value in entity.items(): if not isinstance(value, list): value = [value] for v in value: if isinstance(v, dict): self.analyze_recursive_low_ram(v, prefix=prefix + key + '.') else: self.field_analysis_low_ram[prefix + key] = self.field_analysis_low_ram.get(prefix + key, 0) + 1 # del v # del key, value if __name__ == '__main__': d = DatamodelBuilderSimple() d.analyze_directory_low_ram(input("Directory path: "), input("Dump file path: "))