Files
ktransformers/kt-kernel/python/cli/utils/repo_detector.py
Oql 56cbd69ac4 kt-cli enhancement (#1834)
* [feat]: redesign kt run interactive configuration with i18n support

- Redesign kt run with 8-step interactive flow (model selection, inference method, NUMA/CPU, GPU experts, KV cache, GPU/TP selection, parsers, host/port)
- Add configuration save/load system (~/.ktransformers/run_configs.yaml)
- Add i18n support for kt chat (en/zh translations)
- Add universal input validators with auto-retry and Chinese comma support
- Add port availability checker with auto-suggestion
- Add parser configuration (--tool-call-parser, --reasoning-parser)
- Remove tuna command and clean up redundant files
- Fix: variable reference bug in run.py, filter to show only MoE models

* [feat]: unify model selection UI and enable shared experts fusion by default

- Unify kt run model selection table with kt model list display
  * Add Total size, MoE Size, Repo, and SHA256 status columns
  * Use consistent formatting and styling
  * Improve user decision-making with more information

- Enable --disable-shared-experts-fusion by default
  * Change default value from False to True
  * Users can still override with --enable-shared-experts-fusion

* [feat]: improve kt chat with performance metrics and better CJK support

- Add performance metrics display after each response
  * Total time, TTFT (Time To First Token), TPOT (Time Per Output Token)
  * Accurate input/output token counts using model tokenizer
  * Fallback to estimation if tokenizer unavailable
  * Metrics shown in dim style (not prominent)

- Fix Chinese character input issues
  * Replace Prompt.ask() with console.input() for better CJK support
  * Fixes backspace deletion showing half-characters

- Suppress NumPy subnormal warnings
  * Filter "The value of the smallest subnormal" warnings
  * Cleaner CLI output on certain hardware environments

* [fix]: correct TTFT measurement in kt chat

- Move start_time initialization before API call
- Previously start_time was set when receiving first chunk, causing TTFT ≈ 0ms
- Now correctly measures time from request sent to first token received

* [docs]: 添加 Clawdbot 集成指南 - KTransformers 企业级 AI 助手部署方案

* [docs]: 强调推荐使用 Kimi K2.5 作为核心模型,突出企业级推理能力

* [docs]: 添加 Clawdbot 飞书接入教程链接

* [feat]: improve CLI table display, model verification, and chat experience

- Add sequence number (#) column to all model tables by default
- Filter kt edit to show only MoE GPU models (exclude AMX)
- Extend kt model verify to check *.json and *.py files in addition to weights
- Fix re-verification bug where repaired files caused false failures
- Suppress tokenizer debug output in kt chat token counting

* [fix]: fix cpu cores.

---------

Co-authored-by: skqliao <skqliao@gmail.com>
2026-02-04 16:44:54 +08:00

365 lines
10 KiB
Python

"""
Repo Detector
Automatically detect repository information from model README.md files
"""
import re
from pathlib import Path
from typing import Optional, Dict, Tuple
import yaml
def parse_readme_frontmatter(readme_path: Path) -> Optional[Dict]:
"""
Parse YAML frontmatter from README.md
Args:
readme_path: Path to README.md file
Returns:
Dictionary of frontmatter data, or None if not found
"""
if not readme_path.exists():
return None
try:
with open(readme_path, "r", encoding="utf-8") as f:
content = f.read()
# Match YAML frontmatter between --- markers
match = re.match(r"^---\s*\n(.*?)\n---\s*\n", content, re.DOTALL)
if not match:
return None
yaml_content = match.group(1)
# Parse YAML
try:
data = yaml.safe_load(yaml_content)
return data if isinstance(data, dict) else None
except yaml.YAMLError:
return None
except Exception as e:
return None
def extract_repo_from_frontmatter(frontmatter: Dict) -> Optional[Tuple[str, str]]:
"""
Extract repo_id and repo_type from frontmatter
Args:
frontmatter: Parsed YAML frontmatter dictionary
Returns:
Tuple of (repo_id, repo_type) or None
repo_type is either "huggingface" or "modelscope"
"""
if not frontmatter:
return None
# Priority 1: Extract from license_link (most reliable)
license_link = frontmatter.get("license_link")
if license_link and isinstance(license_link, str):
result = _extract_repo_from_url(license_link)
if result:
return result
# Priority 2: Try to find repo_id from other fields
repo_id = None
# Check base_model field
base_model = frontmatter.get("base_model")
if base_model:
if isinstance(base_model, list) and len(base_model) > 0:
# base_model is a list, take first item
repo_id = base_model[0]
elif isinstance(base_model, str):
repo_id = base_model
# Check model-index field
if not repo_id:
model_index = frontmatter.get("model-index")
if isinstance(model_index, list) and len(model_index) > 0:
first_model = model_index[0]
if isinstance(first_model, dict):
repo_id = first_model.get("name")
# Check model_name field
if not repo_id:
repo_id = frontmatter.get("model_name")
if not repo_id or not isinstance(repo_id, str):
return None
# Validate format: should be "namespace/model-name"
if "/" not in repo_id:
return None
parts = repo_id.split("/")
if len(parts) != 2:
return None
# Determine repo type
repo_type = "huggingface" # Default
# Look for ModelScope indicators
if "modelscope" in repo_id.lower():
repo_type = "modelscope"
# Check tags
tags = frontmatter.get("tags", [])
if isinstance(tags, list):
if "modelscope" in [str(t).lower() for t in tags]:
repo_type = "modelscope"
return (repo_id, repo_type)
def _extract_repo_from_url(url: str) -> Optional[Tuple[str, str]]:
"""
Extract repo_id and repo_type from a URL
Supports:
- https://huggingface.co/Qwen/Qwen3-30B-A3B/blob/main/LICENSE
- https://modelscope.cn/models/Qwen/Qwen3-30B-A3B
Args:
url: URL string
Returns:
Tuple of (repo_id, repo_type) or None
"""
# HuggingFace pattern: https://huggingface.co/{namespace}/{model}/...
hf_match = re.match(r"https?://huggingface\.co/([^/]+)/([^/]+)", url)
if hf_match:
namespace = hf_match.group(1)
model_name = hf_match.group(2)
repo_id = f"{namespace}/{model_name}"
return (repo_id, "huggingface")
# ModelScope pattern: https://modelscope.cn/models/{namespace}/{model}
ms_match = re.match(r"https?://(?:www\.)?modelscope\.cn/models/([^/]+)/([^/]+)", url)
if ms_match:
namespace = ms_match.group(1)
model_name = ms_match.group(2)
repo_id = f"{namespace}/{model_name}"
return (repo_id, "modelscope")
return None
def extract_repo_from_global_search(readme_path: Path) -> Optional[Tuple[str, str]]:
"""
Extract repo info by globally searching for URLs in README.md
Args:
readme_path: Path to README.md file
Returns:
Tuple of (repo_id, repo_type) or None if not found
"""
if not readme_path.exists():
return None
try:
with open(readme_path, "r", encoding="utf-8") as f:
content = f.read()
# Find all HuggingFace URLs
hf_pattern = r"https?://huggingface\.co/([^/\s]+)/([^/\s\)]+)"
hf_matches = re.findall(hf_pattern, content)
# Find all ModelScope URLs
ms_pattern = r"https?://(?:www\.)?modelscope\.cn/models/([^/\s]+)/([^/\s\)]+)"
ms_matches = re.findall(ms_pattern, content)
# Collect all found repos with their types
found_repos = []
for namespace, model_name in hf_matches:
# Skip common non-repo paths
if namespace.lower() in ["docs", "blog", "spaces", "datasets"]:
continue
if model_name.lower() in ["tree", "blob", "raw", "resolve", "discussions"]:
continue
repo_id = f"{namespace}/{model_name}"
found_repos.append((repo_id, "huggingface"))
for namespace, model_name in ms_matches:
repo_id = f"{namespace}/{model_name}"
found_repos.append((repo_id, "modelscope"))
if not found_repos:
return None
# If multiple different repos found, use the last one
# First, deduplicate
seen = {}
for repo_id, repo_type in found_repos:
seen[repo_id] = repo_type # Will keep the last occurrence
# Get the last unique repo
if seen:
# Use the last item from found_repos that's unique
last_unique = None
for repo_id, repo_type in found_repos:
if repo_id in seen:
last_unique = (repo_id, repo_type)
return last_unique
return None
except Exception as e:
return None
def detect_repo_for_model(model_path: str) -> Optional[Tuple[str, str]]:
"""
Detect repository information for a model
Strategy:
Only extract from YAML frontmatter metadata in README.md
(Removed global URL search to avoid false positives)
Args:
model_path: Path to model directory
Returns:
Tuple of (repo_id, repo_type) or None if not detected
"""
model_dir = Path(model_path)
if not model_dir.exists() or not model_dir.is_dir():
return None
# Look for README.md
readme_path = model_dir / "README.md"
if not readme_path.exists():
return None
# Only parse YAML frontmatter (no fallback to global search)
frontmatter = parse_readme_frontmatter(readme_path)
if frontmatter:
return extract_repo_from_frontmatter(frontmatter)
return None
def scan_models_for_repo(model_list) -> Dict:
"""
Scan a list of models and detect repo information
Args:
model_list: List of UserModel objects
Returns:
Dictionary with scan results:
{
'detected': [(model, repo_id, repo_type), ...],
'not_detected': [model, ...],
'skipped': [model, ...] # Already has repo_id
}
"""
results = {"detected": [], "not_detected": [], "skipped": []}
for model in model_list:
# Skip if already has repo_id
if model.repo_id:
results["skipped"].append(model)
continue
# Only process safetensors and gguf models
if model.format not in ["safetensors", "gguf"]:
results["skipped"].append(model)
continue
# Try to detect repo
repo_info = detect_repo_for_model(model.path)
if repo_info:
repo_id, repo_type = repo_info
results["detected"].append((model, repo_id, repo_type))
else:
results["not_detected"].append(model)
return results
def format_detection_report(results: Dict) -> str:
"""
Format scan results into a readable report
Args:
results: Results from scan_models_for_repo()
Returns:
Formatted string report
"""
lines = []
lines.append("=" * 80)
lines.append("Auto-Detection Report")
lines.append("=" * 80)
lines.append("")
# Detected
if results["detected"]:
lines.append(f"✓ Detected repository information ({len(results['detected'])} models):")
lines.append("")
for model, repo_id, repo_type in results["detected"]:
lines.append(f"{model.name}")
lines.append(f" Path: {model.path}")
lines.append(f" Repo: {repo_id} ({repo_type})")
lines.append("")
# Not detected
if results["not_detected"]:
lines.append(f"✗ No repository information found ({len(results['not_detected'])} models):")
lines.append("")
for model in results["not_detected"]:
lines.append(f"{model.name}")
lines.append(f" Path: {model.path}")
lines.append("")
# Skipped
if results["skipped"]:
lines.append(f"⊘ Skipped ({len(results['skipped'])} models):")
lines.append(f" (Already have repo_id or not safetensors/gguf format)")
lines.append("")
lines.append("=" * 80)
lines.append(
f"Summary: {len(results['detected'])} detected, "
f"{len(results['not_detected'])} not detected, "
f"{len(results['skipped'])} skipped"
)
lines.append("=" * 80)
return "\n".join(lines)
def apply_detection_results(results: Dict, registry) -> int:
"""
Apply detected repo information to models in registry
Args:
results: Results from scan_models_for_repo()
registry: UserModelRegistry instance
Returns:
Number of models updated
"""
updated_count = 0
for model, repo_id, repo_type in results["detected"]:
success = registry.update_model(model.name, {"repo_id": repo_id, "repo_type": repo_type})
if success:
updated_count += 1
return updated_count