Files
vaiola/modules/civit/fetch.py

438 lines
18 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import datetime
import json
import time
import os
from collections import defaultdict, Counter
from typing import Dict, List, Any, Tuple, Union
from pathlib import Path
from modules.civit.client import Client
class EntityAnalyzer:
def __init__(self):
self.field_analysis = {}
def _get_json_files(self, directory_path: str) -> List[str]:
"""Получает список всех JSON файлов в директории"""
json_files = []
for filename in os.listdir(directory_path):
if filename.endswith('.json'):
json_files.append(os.path.join(directory_path, filename))
return json_files
def _load_json_data(self, file_path: str) -> List[Dict]:
"""Загружает данные из JSON файла"""
try:
with open(file_path, 'r', encoding='utf-8') as f:
data = json.load(f)
if isinstance(data, list):
return data
else:
return [data]
except (json.JSONDecodeError, IOError) as e:
print(f"Ошибка чтения файла {file_path}: {e}")
return []
def _collect_all_entities(self, directory_path: str) -> List[Dict]:
"""Собирает все экземпляры из всех JSON файлов"""
all_entities = []
json_files = self._get_json_files(directory_path)
for file_path in json_files:
entities = self._load_json_data(file_path)
all_entities.extend(entities)
return all_entities
def _get_field_types(self, value: Any) -> str:
"""Определяет тип значения"""
if isinstance(value, dict):
return 'dict'
elif isinstance(value, list):
return 'list'
elif isinstance(value, bool):
return 'bool'
elif isinstance(value, int):
return 'int'
elif isinstance(value, float):
return 'float'
elif isinstance(value, str):
return 'str'
else:
return 'unknown'
def _get_main_type(self, types: List[str]) -> str:
"""Определяет основной тип из списка типов"""
if not types:
return 'unknown'
# Если есть dict или list - это сложная структура
if 'dict' in types or 'list' in types:
return 'complex'
# Иначе возвращаем первый тип (или объединяем)
unique_types = set(types)
if len(unique_types) == 1:
return types[0]
else:
return 'mixed'
def _is_hashable(self, value: Any) -> bool:
"""Проверяет, является ли значение хэшируемым"""
try:
hash(value)
return True
except TypeError:
return False
def _serialize_value_for_counter(self, value: Any) -> str:
"""Преобразует значение в строку для использования в Counter"""
if self._is_hashable(value):
return value
else:
# Для нехэшируемых типов используем строковое представление
return str(value)
def _analyze_fields_recursive(self, entity: Dict, parent_path: str,
field_types: Dict, field_presence: Dict,
field_values: Dict, top_n: int):
"""Рекурсивно анализирует поля сущности"""
if not isinstance(entity, dict):
return
for key, value in entity.items():
field_path = f"{parent_path}.{key}" if parent_path else key
# Добавляем тип поля
field_types[field_path].append(self._get_field_types(value))
# Отмечаем наличие поля
field_presence[field_path].append(True)
# Сохраняем значение для подсчета частоты (обрабатываем нехэшируемые типы)
if value is not None:
serialized_value = self._serialize_value_for_counter(value)
field_values[field_path].append(serialized_value)
# Рекурсивно анализируем вложенные структуры
if isinstance(value, dict):
self._analyze_fields_recursive(value, field_path, field_types,
field_presence, field_values, top_n)
elif isinstance(value, list):
for item in value:
if isinstance(item, dict):
self._analyze_fields_recursive(item, field_path, field_types,
field_presence, field_values, top_n)
def _analyze_entity_structure(self, entities: List[Dict], top_n: int) -> Dict[str, Any]:
"""Анализирует структуру всех сущностей"""
if not entities:
return {}
# Собираем все поля и их типы
field_types = defaultdict(list)
field_presence = defaultdict(list)
field_values = defaultdict(list)
for entity in entities:
self._analyze_fields_recursive(entity, "", field_types, field_presence,
field_values, top_n)
# Формируем финальный анализ
result = {}
for field_path, types in field_types.items():
# Определяем основной тип
main_type = self._get_main_type(types)
# Подсчитываем частоту наличия поля
presence_count = len(field_presence[field_path])
total_count = len(entities)
always_present = presence_count == total_count
# Получаем топ N значений
top_values = []
if field_path in field_values:
try:
# Преобразуем строки обратно в оригинальные типы для отображения
value_counter = Counter(field_values[field_path])
top_values = [item[0] for item in value_counter.most_common(top_n)]
except Exception:
# Если возникла ошибка, используем пустой список
top_values = []
result[field_path] = {
'type': main_type,
'always_present': always_present,
'top_values': top_values,
'total_count': total_count,
'presence_count': presence_count
}
return result
def analyze_directory(self, directory_path: str, top_n: int = 10) -> Dict[str, Any]:
"""
Основной метод анализа директории
Args:
directory_path: Путь к директории с JSON файлами
top_n: Количество самых частых значений для каждого поля
Returns:
Словарь с анализом структуры данных
"""
# Шаг 1: Собираем все экземпляры из JSON файлов
entities = self._collect_all_entities(directory_path)
# Шаг 2: Анализируем структуру сущностей
self.field_analysis = self._analyze_entity_structure(entities, top_n)
return self.field_analysis
class Fetch:
@staticmethod
def load_json_dir(directory_path):
"""
Получает путь к директории, находит в ней все файлы json,
читает из них списки словарей и возвращает один список со всеми словарями
Args:
directory_path (str): Путь к директории с JSON файлами
Returns:
list: Список всех словарей из всех JSON файлов
"""
all_dicts = []
files = os.listdir(directory_path)
# Проходим по всем файлам в директории
for filename in files:
if filename.endswith('.json'):
file_path = os.path.join(directory_path, filename)
try:
with open(file_path, 'r', encoding='utf-8') as f:
data = json.load(f)
# Если данные - список словарей
if isinstance(data, list):
all_dicts.extend(data)
# Если данные - один словарь
elif isinstance(data, dict):
all_dicts.append(data)
except (json.JSONDecodeError, IOError) as e:
print(f"Ошибка чтения файла {filename}: {e}")
continue
return all_dicts
entities = {
'creator': 'fetch_creators',
'creators': 'fetch_creators',
'tag': 'fetch_tags',
'tags': 'fetch_tags',
'model': 'fetch_models',
'models': 'fetch_models',
'image': 'fetch_images',
'images': 'fetch_images',
}
@classmethod
def load(cls, client: Client, entity_type: str):
if entity_type in cls.entities: subdir = cls.entities[entity_type]
else: raise ValueError(f'Civit doesn\'t have entity type {entity_type}')
res = cls.load_json_dir(str(Path(client.path) / subdir))
return res
@classmethod
def datamodel(cls, client: Client, subdir, top = None):
if not top: top = 10
path = Path(client.path) / subdir
datamodel = EntityAnalyzer().analyze_directory(path, top_n=top)
return datamodel
@classmethod
def _save_json(cls, path, items):
with open(path, 'w') as f:
json.dump(items, f, indent=2, ensure_ascii=False)
@classmethod
def _paginated_crawler_parse_metadata(cls, page):
metadata = page.get('metadata', None)
if not metadata: raise RuntimeError("Unable to find metadata")
total_pages = metadata.get('totalPages', None)
current_page = metadata.get('currentPage', None)
if not total_pages or not current_page: RuntimeError("Unable to parse metadata")
print(f"Found! Total pages: {total_pages}")
return total_pages, current_page
@classmethod
def _paginated_crawler(cls, client: Client, entity: str, save = True):
items = list()
print(f"Fetching {entity}...")
path = Path(client.path) / ('fetch_' + entity)
first_page = client.get_creators_tags_raw(entity)
if first_page.get('items', None): items.extend(first_page.get('items', None))
if save:
path.mkdir(exist_ok=True)
cls._save_json(path / 'first.json', items)
total_pages, current_page = cls._paginated_crawler_parse_metadata(first_page)
for i in range(2, total_pages + 1):
print(f"Fetching page {i} of {total_pages}")
page = client.get_creators_tags_raw(entity, page=i)
time.sleep(3)
page_items = page.get('items', None)
if not page_items: continue
items.extend(page_items)
if save: cls._save_json(path / f'page_{i}.json', page_items)
if save: cls._save_json(path / f'all.json', items)
return items
@classmethod
def creators(cls, client: Client, subdir = 'fetch_creators', save = True):
return cls._paginated_crawler(client, 'creators', save)
@classmethod
def tags(cls, client: Client, subdir='fetch_tags', save=True):
return cls._paginated_crawler(client, 'tags', save)
@classmethod
def _cursor_crawler_parse_metadata(cls, page):
metadata = page.get('metadata', None)
if not metadata: raise RuntimeError("Unable to find metadata")
next_page = metadata.get('nextPage', None)
next_cursor = metadata.get('nextCursor', None)
if not next_page or not next_cursor: RuntimeError("Unable to parse metadata")
return next_page, next_cursor
@classmethod
def _cursor_crawler_avoid_slip(cls, client: Client, url, path, entity, slip_retries = 5, chill_time = 3):
slip_counter = 0
while True:
page = client.make_get_request(url).json()
try: next_page, next_cursor = cls._cursor_crawler_parse_metadata(page)
except RuntimeError: return page
if slip_counter >= slip_retries: break
if next_page == url:
slip_counter = slip_counter + 1
with open(Path(path) / 'slip.log', 'a') as file: file.write(f'{url}\n')
continue
else:
return page
if entity not in {'models'}: raise RuntimeError("Slip detected! Avoiding failed: NotImplemented")
split = url.rsplit('.', 1)
prefix = split[0] + '.'
split = split[1].split('%', 1)
suffix = '%' + split[1]
num = int(split[0])
if num < 999:
num = num + 1
else:
raise RuntimeError("Slip avoiding failed: Number overflow")
url = prefix + f'{num:03d}' + suffix
page = client.make_get_request(url).json()
next_page, next_cursor = cls._paginated_crawler_parse_metadata(page)
if next_page != url: return page
else: raise RuntimeError("Slip avoiding failed: Not effective")
@classmethod
def _cursor_crawler(cls, client: Client, entity: str, params: dict, save = True):
print(f"Fetching {entity}...")
path = Path(client.path) / ('fetch_' + entity)
items = list()
url = f'{client.config.base_url}/api/v1/{entity}{client.build_query_string(params)}'
first_page = client.make_get_request(url)
if not first_page:
with open(Path(client.path) / 'bugs.log', 'a') as f: f.write(url + '\n')
return items
first_page = first_page.json()
if first_page.get('items', None): items.extend(first_page.get('items', None))
if save:
path.mkdir(exist_ok=True)
cls._save_json(path / 'first.json', items)
try: next_page, next_cursor = cls._cursor_crawler_parse_metadata(first_page)
except RuntimeError: return items
while next_page:
time.sleep(3)
page = cls._cursor_crawler_avoid_slip(client, next_page, path, entity)
page_items = page.get('items', None)
if not page_items:
with open(Path(client.path)/'bugs.log', 'a') as f: f.write(next_page + '\n')
return items
l = len(items)
items.extend(page_items)
print(f"Fetched {len(items) - l}/{len(page_items)} {entity} from page {next_page}")
if save: cls._save_json(path / f'page_{next_cursor}.json', page_items)
try: next_page, next_cursor = cls._cursor_crawler_parse_metadata(page)
except RuntimeError: break
if save: cls._save_json(path / f'all.json', items)
return items
@classmethod
def models(cls, client: Client, subdir='fetch_models', save=True):
return cls._cursor_crawler(client, 'models', {'period': 'AllTime', 'sort': 'Oldest', 'nsfw':'true'}, save)
@classmethod
def images(cls, client: Client, subdir='fetch_images', save=True, start_with = None):
items = list()
if not start_with: start_with = 0
path = Path(client.path) / ('fetch_' + 'images')
if save: path.mkdir(exist_ok=True)
creators = [c.get('username', None) for c in cls.load(client, 'creators')]
counter = 1 + int(start_with)
for username in creators[int(start_with):]:
# for username in ['yonne']:
time.sleep(3)
if not username: continue
page_items = cls._cursor_crawler(client, 'images', {'period': 'AllTime', 'sort': 'Oldest', 'nsfw':'X', 'username': username, 'limit': '200'}, save=False)
if len(page_items) >= 49000:
with open(path / '_giants.log', 'a') as f: f.write(username + '\n')
print(f'Giant {username} has more then {len(page_items)} images, starting deep scan')
page_items_dict = dict()
for item in page_items: page_items_dict[item['id']] = item
print(f'Transferred {len(page_items_dict)} images of {len(page_items)}')
for sort in ['Newest', 'Most Reactions', 'Most Comments', 'Most Collected']:
page_items = cls._cursor_crawler(client, 'images',
{'period': 'AllTime', 'sort': sort, 'nsfw': 'X',
'username': username, 'limit': '200'}, save=False)
l = len(page_items_dict)
for item in page_items: page_items_dict[item['id']] = item
print(f'Added {len(page_items_dict) - l} images by {sort} sort crawl. {len(page_items_dict)} images total')
page_items = [value for key, value in page_items_dict.items()]
l = len(items)
#items.extend(page_items)
print(f"Fetched {len(page_items)} images by {username} ({counter}/{len(creators)})")
counter = counter + 1
if save: cls._save_json(path / f'{username}.json', page_items)
#if save: cls._save_json(path / 'aaa.json', items)
return items