mirror of
https://github.com/Physton/sd-webui-prompt-all-in-one.git
synced 2026-05-01 03:31:41 +00:00
Created
Former-commit-id: e255ced1c91ef6a7722dce7268872be9ee326168
This commit is contained in:
BIN
scripts/__pycache__/get_i18n.cpython-310.pyc
Normal file
BIN
scripts/__pycache__/get_i18n.cpython-310.pyc
Normal file
Binary file not shown.
BIN
scripts/__pycache__/get_translate_apis.cpython-310.pyc
Normal file
BIN
scripts/__pycache__/get_translate_apis.cpython-310.pyc
Normal file
Binary file not shown.
BIN
scripts/__pycache__/translate.cpython-310.pyc
Normal file
BIN
scripts/__pycache__/translate.cpython-310.pyc
Normal file
Binary file not shown.
11
scripts/get_extensions.py
Normal file
11
scripts/get_extensions.py
Normal file
@@ -0,0 +1,11 @@
|
||||
import os
|
||||
from pathlib import Path
|
||||
|
||||
def get_extensions():
|
||||
extends_dir = os.path.join(Path().absolute(), 'extensions')
|
||||
extends = []
|
||||
for name in os.listdir(extends_dir):
|
||||
path = os.path.join(extends_dir, name)
|
||||
if os.path.isdir(path):
|
||||
extends.append(name)
|
||||
return extends
|
||||
14
scripts/get_i18n.py
Normal file
14
scripts/get_i18n.py
Normal file
@@ -0,0 +1,14 @@
|
||||
import os
|
||||
import json
|
||||
|
||||
i18n = {}
|
||||
def get_i18n(reload=False):
|
||||
global i18n
|
||||
if reload or not i18n:
|
||||
i18n = {}
|
||||
current_dir = os.path.dirname(os.path.abspath(__file__))
|
||||
config_file = os.path.join(current_dir, '../i18n.json')
|
||||
config_file = os.path.normpath(config_file)
|
||||
with open(config_file, 'r', encoding='utf8') as f:
|
||||
i18n = json.load(f)
|
||||
return i18n
|
||||
21
scripts/get_token_counter.py
Normal file
21
scripts/get_token_counter.py
Normal file
@@ -0,0 +1,21 @@
|
||||
from modules import script_callbacks, extra_networks, prompt_parser
|
||||
from modules.sd_hijack import model_hijack
|
||||
from functools import partial, reduce
|
||||
|
||||
def get_token_counter(text, steps):
|
||||
# copy from modules.ui.py
|
||||
try:
|
||||
text, _ = extra_networks.parse_prompt(text)
|
||||
|
||||
_, prompt_flat_list, _ = prompt_parser.get_multicond_prompt_list([text])
|
||||
prompt_schedules = prompt_parser.get_learned_conditioning_prompt_schedules(prompt_flat_list, steps)
|
||||
|
||||
except Exception:
|
||||
# a parsing error can happen here during typing, and we don't want to bother the user with
|
||||
# messages related to it in console
|
||||
prompt_schedules = [[[steps, text]]]
|
||||
|
||||
flat_prompts = reduce(lambda list1, list2: list1+list2, prompt_schedules)
|
||||
prompts = [prompt_text for step, prompt_text in flat_prompts]
|
||||
token_count, max_length = max([model_hijack.get_prompt_lengths(prompt) for prompt in prompts], key=lambda args: args[0])
|
||||
return {"token_count": token_count, "max_length": max_length}
|
||||
35
scripts/get_translate_apis.py
Normal file
35
scripts/get_translate_apis.py
Normal file
@@ -0,0 +1,35 @@
|
||||
import os
|
||||
import json
|
||||
# from scripts.storage import storage
|
||||
|
||||
translate_apis = {}
|
||||
# st = storage()
|
||||
def get_translate_apis(reload=False):
|
||||
global translate_apis
|
||||
global st
|
||||
if reload or not translate_apis:
|
||||
translate_apis = {}
|
||||
current_dir = os.path.dirname(os.path.abspath(__file__))
|
||||
config_file = os.path.join(current_dir, '../translate_apis.json')
|
||||
config_file = os.path.normpath(config_file)
|
||||
with open(config_file, 'r', encoding='utf8') as f:
|
||||
translate_apis = json.load(f)
|
||||
|
||||
# for group in translate_apis['apis']:
|
||||
# for item in group['children']:
|
||||
# if 'config' not in item:
|
||||
# continue
|
||||
# config_name = 'translate_api.' + item['key']
|
||||
# config = st.get(config_name)
|
||||
# if not config:
|
||||
# config = {}
|
||||
# for config_item in item['config']:
|
||||
# if config_item['key'] in config:
|
||||
# config_item['value'] = config[config_item['key']]
|
||||
# else:
|
||||
# if 'default' in config_item:
|
||||
# config_item['value'] = config_item['default']
|
||||
# else:
|
||||
# config_item['value'] = ''
|
||||
|
||||
return translate_apis
|
||||
152
scripts/history.py
Normal file
152
scripts/history.py
Normal file
@@ -0,0 +1,152 @@
|
||||
from scripts.storage import storage
|
||||
import uuid
|
||||
import time
|
||||
|
||||
class history:
|
||||
histoies = {
|
||||
'txt2img': [],
|
||||
'txt2img_neg': [],
|
||||
'img2img': [],
|
||||
'img2img_neg': [],
|
||||
}
|
||||
favorites = {
|
||||
'txt2img': [],
|
||||
'txt2img_neg': [],
|
||||
'img2img': [],
|
||||
'img2img_neg': [],
|
||||
}
|
||||
max = 100
|
||||
storage = storage()
|
||||
|
||||
def __init__(self):
|
||||
for type in self.histoies:
|
||||
self.histoies[type] = self.storage.get('history.' + type)
|
||||
if self.histoies[type] is None:
|
||||
self.histoies[type] = []
|
||||
self.__save_histories(type)
|
||||
|
||||
for type in self.favorites:
|
||||
self.favorites[type] = self.storage.get('favorite.' + type)
|
||||
if self.favorites[type] is None:
|
||||
self.favorites[type] = []
|
||||
self.__save_favorites(type)
|
||||
|
||||
def __save_histories(self, type):
|
||||
self.storage.set('history.' + type, self.histoies[type])
|
||||
|
||||
def __save_favorites(self, type):
|
||||
self.storage.set('favorite.' + type, self.favorites[type])
|
||||
|
||||
def get_histoies(self, type):
|
||||
histoies = self.histoies[type]
|
||||
for history in histoies:
|
||||
history['is_favorite'] = self.is_favorite(type, history['id'])
|
||||
return histoies
|
||||
|
||||
def is_favorite(self, type, id):
|
||||
for favorite in self.favorites[type]:
|
||||
if favorite['id'] == id:
|
||||
return True
|
||||
return False
|
||||
|
||||
def get_favorites(self, type):
|
||||
return self.favorites[type]
|
||||
|
||||
def push_history(self, type, tags, prompt, name=''):
|
||||
if len(self.histoies[type]) >= self.max:
|
||||
self.histoies[type].pop()
|
||||
item = {
|
||||
'id': str(uuid.uuid1()),
|
||||
'time': int(time.time()),
|
||||
'name': name,
|
||||
'tags': tags,
|
||||
'prompt': prompt,
|
||||
}
|
||||
self.histoies[type].append(item)
|
||||
self.__save_histories(type)
|
||||
return item
|
||||
|
||||
def get_latest_history(self, type):
|
||||
if len(self.histoies[type]) > 0:
|
||||
return self.histoies[type][-1]
|
||||
return None
|
||||
|
||||
def set_history(self, type, id, tags, prompt, name):
|
||||
for history in self.histoies[type]:
|
||||
if history['id'] == id:
|
||||
history['tags'] = tags
|
||||
history['prompt'] = prompt
|
||||
history['name'] = name
|
||||
self.__save_histories(type)
|
||||
if self.is_favorite(type, id):
|
||||
self.set_favorite(type, id, tags, prompt, name)
|
||||
return True
|
||||
return False
|
||||
|
||||
def set_favorite(self, type, id, tags, prompt, name):
|
||||
for favorite in self.favorites[type]:
|
||||
if favorite['id'] == id:
|
||||
favorite['tags'] = tags
|
||||
favorite['prompt'] = prompt
|
||||
favorite['name'] = name
|
||||
self.__save_favorites(type)
|
||||
return True
|
||||
return False
|
||||
|
||||
def set_history_name(self, type, id, name):
|
||||
for history in self.histoies[type]:
|
||||
if history['id'] == id:
|
||||
history['name'] = name
|
||||
self.__save_histories(type)
|
||||
for favorite in self.favorites[type]:
|
||||
if favorite['id'] == id:
|
||||
favorite['name'] = name
|
||||
self.__save_favorites(type)
|
||||
return True
|
||||
return False
|
||||
|
||||
def set_favorite_name(self, type, id, name):
|
||||
for favorite in self.favorites[type]:
|
||||
if favorite['id'] == id:
|
||||
favorite['name'] = name
|
||||
self.__save_favorites(type)
|
||||
for history in self.histoies[type]:
|
||||
if history['id'] == id:
|
||||
history['name'] = name
|
||||
self.__save_histories(type)
|
||||
return True
|
||||
return False
|
||||
|
||||
def dofavorite(self, type, id):
|
||||
if self.is_favorite(type, id):
|
||||
return False
|
||||
for history in self.histoies[type]:
|
||||
if history['id'] == id:
|
||||
self.favorites[type].append(history)
|
||||
self.__save_favorites(type)
|
||||
return True
|
||||
return False
|
||||
|
||||
def unfavorite(self, type, id):
|
||||
if not self.is_favorite(type, id):
|
||||
return False
|
||||
for favorite in self.favorites[type]:
|
||||
if favorite['id'] == id:
|
||||
self.favorites[type].remove(favorite)
|
||||
self.__save_favorites(type)
|
||||
return True
|
||||
return False
|
||||
|
||||
def remove_history(self, type, id):
|
||||
for history in self.histoies[type]:
|
||||
if history['id'] == id:
|
||||
self.histoies[type].remove(history)
|
||||
self.__save_histories(type)
|
||||
return True
|
||||
return False
|
||||
|
||||
def remove_histories(self, type):
|
||||
self.histoies[type] = []
|
||||
self.__save_histories(type)
|
||||
return True
|
||||
|
||||
186
scripts/on_app_started.py
Executable file
186
scripts/on_app_started.py
Executable file
@@ -0,0 +1,186 @@
|
||||
import gradio as gr
|
||||
import os
|
||||
from pathlib import Path
|
||||
from modules import script_callbacks, extra_networks, prompt_parser
|
||||
from fastapi import FastAPI, Body, Request
|
||||
from scripts.storage import storage
|
||||
from scripts.get_extensions import get_extensions
|
||||
from scripts.get_token_counter import get_token_counter
|
||||
from scripts.get_i18n import get_i18n
|
||||
from scripts.get_translate_apis import get_translate_apis
|
||||
from scripts.translate import translate
|
||||
from scripts.history import history
|
||||
|
||||
VERSION = '0.0.1'
|
||||
|
||||
def on_app_started(_: gr.Blocks, app: FastAPI):
|
||||
st = storage()
|
||||
hi = history()
|
||||
|
||||
@app.get("/physton_prompt/get_version")
|
||||
async def _get_version():
|
||||
return {"version": VERSION}
|
||||
|
||||
@app.get("/physton_prompt/get_config")
|
||||
async def _get_config():
|
||||
return {
|
||||
'i18n': get_i18n(True),
|
||||
'translate_apis': get_translate_apis(True),
|
||||
}
|
||||
|
||||
@app.get("/physton_prompt/get_extensions")
|
||||
async def _get_extensions():
|
||||
return {"extends": get_extensions()}
|
||||
|
||||
@app.post("/physton_prompt/token_counter")
|
||||
async def _token_counter(request: Request):
|
||||
data = await request.json()
|
||||
if 'text' not in data or 'steps' not in data:
|
||||
return {"success": False, "message": "text or steps is required"}
|
||||
return get_token_counter(data['text'], data['steps'])
|
||||
|
||||
@app.get("/physton_prompt/get_data")
|
||||
async def _get_data(key: str):
|
||||
return {"data": st.get(key)}
|
||||
|
||||
@app.get("/physton_prompt/get_datas")
|
||||
async def _get_datas(keys: str):
|
||||
keys = keys.split(',')
|
||||
datas = {}
|
||||
for key in keys:
|
||||
datas[key] = st.get(key)
|
||||
return {"datas": datas}
|
||||
|
||||
@app.post("/physton_prompt/set_data")
|
||||
async def _set_data(request: Request):
|
||||
data = await request.json()
|
||||
if 'key' not in data or 'data' not in data:
|
||||
return {"success": False, "message": "key or data is required"}
|
||||
st.set(data['key'], data['data'])
|
||||
return {"success": True}
|
||||
|
||||
@app.post("/physton_prompt/set_datas")
|
||||
async def _set_datas(request: Request):
|
||||
data = await request.json()
|
||||
if not isinstance(data, dict):
|
||||
return {"success": False, "message": "data is not dict"}
|
||||
for key in data:
|
||||
st.set(key, data[key])
|
||||
return {"success": True}
|
||||
|
||||
@app.get("/physton_prompt/get_data_list_item")
|
||||
async def _get_data_list_item(key: str, index: int):
|
||||
return {"item": st.list_get(key, index)}
|
||||
|
||||
@app.post("/physton_prompt/push_data_list")
|
||||
async def _push_data_list(request: Request):
|
||||
data = await request.json()
|
||||
if 'key' not in data or 'item' not in data:
|
||||
return {"success": False, "message": "key or item is required"}
|
||||
st.list_push(data['key'], data['item'])
|
||||
return {"success": True}
|
||||
|
||||
@app.post("/physton_prompt/pop_data_list")
|
||||
async def _pop_data_list(request: Request):
|
||||
data = await request.json()
|
||||
if 'key' not in data:
|
||||
return {"success": False, "message": "key is required"}
|
||||
return {"success": True, 'item': st.list_pop(data['key'])}
|
||||
|
||||
@app.post("/physton_prompt/shift_data_list")
|
||||
async def _shift_data_list(request: Request):
|
||||
data = await request.json()
|
||||
if 'key' not in data:
|
||||
return {"success": False, "message": "key is required"}
|
||||
return {"success": True, 'item': st.list_shift(data['key'])}
|
||||
|
||||
@app.post("/physton_prompt/remove_data_list")
|
||||
async def _remove_data_list(request: Request):
|
||||
data = await request.json()
|
||||
if 'key' not in data or 'index' not in data:
|
||||
return {"success": False, "message": "key or index is required"}
|
||||
st.list_remove(data['key'], data['index'])
|
||||
return {"success": True}
|
||||
|
||||
@app.post("/physton_prompt/clear_data_list")
|
||||
async def _clear_data_list(request: Request):
|
||||
data = await request.json()
|
||||
if 'key' not in data:
|
||||
return {"success": False, "message": "key is required"}
|
||||
st.list_clear(data['key'])
|
||||
return {"success": True}
|
||||
|
||||
@app.get("/physton_prompt/get_histories")
|
||||
async def _get_histories(type: str):
|
||||
return {"histories": hi.get_histoies(type)}
|
||||
|
||||
@app.get("/physton_prompt/get_favorites")
|
||||
async def _get_favorites(type: str):
|
||||
return {"favorites": hi.get_favorites(type)}
|
||||
|
||||
@app.post("/physton_prompt/push_history")
|
||||
async def _push_history(request: Request):
|
||||
data = await request.json()
|
||||
if 'type' not in data or 'tags' not in data or 'prompt' not in data:
|
||||
return {"success": False, "message": "type or tags or prompt is required"}
|
||||
hi.push_history(data['type'], data['tags'], data['prompt'], data.get('name', ''))
|
||||
return {"success": True}
|
||||
|
||||
@app.get("/physton_prompt/get_latest_history")
|
||||
async def _get_latest_history(type: str):
|
||||
return {"history": hi.get_latest_history(type)}
|
||||
|
||||
@app.post("/physton_prompt/set_history")
|
||||
async def _set_history(request: Request):
|
||||
data = await request.json()
|
||||
if 'type' not in data or 'id' not in data or 'tags' not in data or 'prompt' not in data or 'name' not in data:
|
||||
return {"success": False, "message": "type or id or tags or prompt is required"}
|
||||
return {"success": hi.set_history(data['type'], data['id'], data['tags'], data['prompt'], data['name'])}
|
||||
|
||||
@app.post("/physton_prompt/set_history_name")
|
||||
async def _set_history_name(request: Request):
|
||||
data = await request.json()
|
||||
if 'type' not in data or 'id' not in data or 'name' not in data:
|
||||
return {"success": False, "message": "type or id or name is required"}
|
||||
return {"success": hi.set_history_name(data['type'], data['id'], data['name'])}
|
||||
|
||||
@app.post("/physton_prompt/set_favorite_name")
|
||||
async def _set_favorite_name(request: Request):
|
||||
data = await request.json()
|
||||
if 'type' not in data or 'id' not in data or 'name' not in data:
|
||||
return {"success": False, "message": "type or id or name is required"}
|
||||
return {"success": hi.set_favorite_name(data['type'], data['id'], data['name'])}
|
||||
|
||||
@app.post("/physton_prompt/dofavorite")
|
||||
async def _dofavorite(request: Request):
|
||||
data = await request.json()
|
||||
if 'type' not in data or 'id' not in data:
|
||||
return {"success": False, "message": "type or id is required"}
|
||||
return {"success": hi.dofavorite(data['type'], data['id'])}
|
||||
|
||||
@app.post("/physton_prompt/unfavorite")
|
||||
async def _unfavorite(request: Request):
|
||||
data = await request.json()
|
||||
if 'type' not in data or 'id' not in data:
|
||||
return {"success": False, "message": "type or id is required"}
|
||||
return {"success": hi.unfavorite(data['type'], data['id'])}
|
||||
|
||||
@app.post("/physton_prompt/delete_history")
|
||||
async def _delete_history(request: Request):
|
||||
data = await request.json()
|
||||
if 'type' not in data or 'id' not in data:
|
||||
return {"success": False, "message": "type or id is required"}
|
||||
return {"success": hi.remove_history(data['type'], data['id'])}
|
||||
|
||||
@app.post("/physton_prompt/delete_histories")
|
||||
async def _delete_histories(request: Request):
|
||||
data = await request.json()
|
||||
if 'type' not in data:
|
||||
return {"success": False, "message": "type is required"}
|
||||
return {"success": hi.remove_histories(data['type'])}
|
||||
|
||||
@app.post("/physton_prompt/translate")
|
||||
async def _translate(text: str = Body(...), from_lang: str = Body(...), to_lang: str = Body(...), api: str = Body(...), api_config: dict = Body(...)):
|
||||
return translate(text, from_lang, to_lang, api, api_config)
|
||||
|
||||
script_callbacks.on_app_started(on_app_started)
|
||||
144
scripts/storage.py
Normal file
144
scripts/storage.py
Normal file
@@ -0,0 +1,144 @@
|
||||
import os
|
||||
from pathlib import Path
|
||||
import json
|
||||
import time
|
||||
|
||||
class storage:
|
||||
storage_path = ''
|
||||
def __init__(self):
|
||||
pass
|
||||
|
||||
def __get_storage_path(self):
|
||||
self.get_storage_path = os.path.join(Path().absolute(), 'physton-prompt')
|
||||
if not os.path.exists(self.get_storage_path):
|
||||
os.makedirs(self.get_storage_path)
|
||||
return self.get_storage_path
|
||||
|
||||
def __get_data_filename(self, key):
|
||||
return self.__get_storage_path() + '/' + key + '.json'
|
||||
|
||||
def __get_key_lock_filename(self, key):
|
||||
return self.__get_storage_path() + '/' + key + '.lock'
|
||||
|
||||
def __lock(self, key):
|
||||
file_path = self.__get_key_lock_filename(key)
|
||||
with open(file_path, 'w') as f:
|
||||
f.write('1')
|
||||
|
||||
def __unlock(self, key):
|
||||
file_path = self.__get_key_lock_filename(key)
|
||||
if os.path.exists(file_path):
|
||||
os.remove(file_path)
|
||||
|
||||
def __is_locked(self, key):
|
||||
file_path = self.__get_key_lock_filename(key)
|
||||
return os.path.exists(file_path)
|
||||
|
||||
def __get(self, key):
|
||||
filename = self.__get_data_filename(key)
|
||||
if not os.path.exists(filename):
|
||||
return None
|
||||
with open(filename, 'r') as f:
|
||||
data = json.load(f)
|
||||
return data
|
||||
|
||||
def __set(self, key, data):
|
||||
file_path = self.__get_data_filename(key)
|
||||
with open(file_path, 'w') as f:
|
||||
json.dump(data, f, indent=4, ensure_ascii=False)
|
||||
|
||||
def set(self, key, data):
|
||||
while self.__is_locked(key):
|
||||
time.sleep(0.01)
|
||||
self.__lock(key)
|
||||
try:
|
||||
self.__set(key, data)
|
||||
self.__unlock(key)
|
||||
except Exception as e:
|
||||
self.__unlock(key)
|
||||
raise e
|
||||
|
||||
def get(self, key):
|
||||
return self.__get(key)
|
||||
|
||||
def delete(self, key):
|
||||
file_path = self.__get_data_filename(key)
|
||||
if os.path.exists(file_path):
|
||||
os.remove(file_path)
|
||||
|
||||
def __get_list(self, key):
|
||||
data = self.get(key)
|
||||
if not data:
|
||||
data = []
|
||||
return data
|
||||
|
||||
# 向列表中添加元素
|
||||
def list_push(self, key, item):
|
||||
while self.__is_locked(key):
|
||||
time.sleep(0.01)
|
||||
self.__lock(key)
|
||||
try:
|
||||
data = self.__get_list(key)
|
||||
data.append(item)
|
||||
self.__set(key, data)
|
||||
self.__unlock(key)
|
||||
except Exception as e:
|
||||
self.__unlock(key)
|
||||
raise e
|
||||
|
||||
# 从列表中删除和返回最后一个元素
|
||||
def list_pop(self, key):
|
||||
while self.__is_locked(key):
|
||||
time.sleep(0.01)
|
||||
self.__lock(key)
|
||||
try:
|
||||
data = self.__get_list(key)
|
||||
item = data.pop()
|
||||
self.__set(key, data)
|
||||
self.__unlock(key)
|
||||
return item
|
||||
except Exception as e:
|
||||
self.__unlock(key)
|
||||
raise e
|
||||
|
||||
# 从列表中删除和返回第一个元素
|
||||
def list_shift(self, key):
|
||||
while self.__is_locked(key):
|
||||
time.sleep(0.01)
|
||||
self.__lock(key)
|
||||
try:
|
||||
data = self.__get_list(key)
|
||||
item = data.pop(0)
|
||||
self.__set(key, data)
|
||||
self.__unlock(key)
|
||||
return item
|
||||
except Exception as e:
|
||||
self.__unlock(key)
|
||||
raise e
|
||||
|
||||
# 从列表中删除指定元素
|
||||
def list_remove(self, key, index):
|
||||
while self.__is_locked(key):
|
||||
time.sleep(0.01)
|
||||
self.__lock(key)
|
||||
data = self.__get_list(key)
|
||||
data.pop(index)
|
||||
self.__set(key, data)
|
||||
self.__unlock(key)
|
||||
|
||||
# 获取列表中指定位置的元素
|
||||
def list_get(self, key, index):
|
||||
data = self.__get_list(key)
|
||||
return data[index]
|
||||
|
||||
# 清空列表中的所有元素
|
||||
def list_clear(self, key):
|
||||
while self.__is_locked(key):
|
||||
time.sleep(0.01)
|
||||
self.__lock(key)
|
||||
try:
|
||||
self.__set(key, [])
|
||||
self.__unlock(key)
|
||||
except Exception as e:
|
||||
self.__unlock(key)
|
||||
raise e
|
||||
321
scripts/translate.py
Normal file
321
scripts/translate.py
Normal file
@@ -0,0 +1,321 @@
|
||||
from scripts.get_translate_apis import get_translate_apis
|
||||
import hashlib
|
||||
import os
|
||||
import requests
|
||||
import uuid
|
||||
import random
|
||||
import json
|
||||
import time
|
||||
|
||||
caches = {}
|
||||
|
||||
def translate_google(text, from_lang, to_lang, api_config):
|
||||
url = 'https://translation.googleapis.com/language/translate/v2/'
|
||||
api_key = api_config.get('api_key', '')
|
||||
if not api_key:
|
||||
raise Exception("api_key is required")
|
||||
params = {
|
||||
'key': api_key,
|
||||
'q': text,
|
||||
'source': from_lang,
|
||||
'target': to_lang,
|
||||
'format': 'text'
|
||||
}
|
||||
response = requests.get(url, params=params, timeout=10)
|
||||
result = response.json()
|
||||
if 'error' in result:
|
||||
raise Exception(result['error']['message'])
|
||||
return result['data']['translations'][0]['translatedText']
|
||||
|
||||
def translate_openai(text, from_lang, to_lang, api_config):
|
||||
import openai
|
||||
openai.api_key = api_config.get('api_key', '')
|
||||
model = api_config.get('model', 'gpt-3.5-turbo')
|
||||
if not openai.api_key:
|
||||
raise Exception("api_key is required")
|
||||
messages = [
|
||||
{"role": "system", "content": "You are a translator assistant."},
|
||||
{"role":
|
||||
"user",
|
||||
"content": f"You are a translator assistant. Please translate the following JSON data {to_lang}. Preserve the original format. Only return the translation result, without any additional content or annotations. If the prompt word is in the target language, please send it to me unchanged:\n{text}"
|
||||
},
|
||||
]
|
||||
completion = openai.ChatCompletion.create(model=model, messages=messages, timeout=10)
|
||||
if len(completion.choices) == 0:
|
||||
raise Exception("No response from OpenAI")
|
||||
content = completion.choices[0].message.content
|
||||
return content
|
||||
|
||||
def translate_microsoft(text, from_lang, to_lang, api_config):
|
||||
url = 'https://api.cognitive.microsofttranslator.com/translate'
|
||||
api_key = api_config.get('api_key', '')
|
||||
region = api_config.get('region', '')
|
||||
if not api_key:
|
||||
raise Exception("api_key is required")
|
||||
if not region:
|
||||
raise Exception("region is required")
|
||||
params = {
|
||||
'api-version': '3.0',
|
||||
'from': from_lang,
|
||||
'to': to_lang
|
||||
}
|
||||
headers = {
|
||||
'Ocp-Apim-Subscription-Key': api_key,
|
||||
'Ocp-Apim-Subscription-Region': region,
|
||||
'Content-type': 'application/json',
|
||||
'X-ClientTraceId': str(uuid.uuid4())
|
||||
}
|
||||
body = [{
|
||||
'text': text
|
||||
}]
|
||||
response = requests.post(url, params=params, headers=headers, json=body, timeout=10)
|
||||
result = response.json()
|
||||
if 'error' in result:
|
||||
raise Exception(result['error']['message'])
|
||||
if len(result) == 0:
|
||||
raise Exception("No response from Microsoft")
|
||||
return result[0]['translations'][0]['text']
|
||||
|
||||
def translate_amazon(text, from_lang, to_lang, api_config):
|
||||
import boto3
|
||||
api_key_id = api_config.get('api_key_id', '')
|
||||
api_key_secret = api_config.get('api_key_secret', '')
|
||||
region = api_config.get('region', '')
|
||||
if not api_key_id:
|
||||
raise Exception("api_key_id is required")
|
||||
if not api_key_secret:
|
||||
raise Exception("api_key_secret is required")
|
||||
if not region:
|
||||
raise Exception("region is required")
|
||||
|
||||
translate = boto3.client(service_name='translate', region_name=region, use_ssl=True, aws_access_key_id=api_key_id, aws_secret_access_key=api_key_secret)
|
||||
result = translate.translate_text(Text=text, SourceLanguageCode=from_lang, TargetLanguageCode=to_lang)
|
||||
if 'TranslatedText' not in result:
|
||||
raise Exception("No response from Amazon")
|
||||
return result['TranslatedText']
|
||||
|
||||
def translate_deepl(text, from_lang, to_lang, api_config):
|
||||
url = 'https://api-free.deepl.com/v2/translate'
|
||||
api_key = api_config.get('api_key', '')
|
||||
if not api_key:
|
||||
raise Exception("api_key is required")
|
||||
headers = {"Authorization": f"DeepL-Auth-Key {api_key}"}
|
||||
data = {
|
||||
'text': text,
|
||||
'source_lang': from_lang,
|
||||
'target_lang': to_lang
|
||||
}
|
||||
|
||||
response = requests.post(url, headers=headers, data=data, timeout=10)
|
||||
result = response.json()
|
||||
if 'message' in result:
|
||||
raise Exception(result['message'])
|
||||
if 'translations' not in result:
|
||||
raise Exception("No response from DeepL")
|
||||
return result['translations'][0]['text']
|
||||
|
||||
def translate_baidu(text, from_lang, to_lang, api_config):
|
||||
url = "https://fanyi-api.baidu.com/api/trans/vip/translate"
|
||||
app_id = api_config.get('app_id', '')
|
||||
app_secret = api_config.get('app_secret', '')
|
||||
if not app_id:
|
||||
raise Exception("app_id is required")
|
||||
if not app_secret:
|
||||
raise Exception("app_secret is required")
|
||||
salt = random.randint(32768, 65536)
|
||||
sign = app_id + text + str(salt) + app_secret
|
||||
sign = hashlib.md5(sign.encode()).hexdigest()
|
||||
params = {
|
||||
'q': text,
|
||||
'from': from_lang,
|
||||
'to': to_lang,
|
||||
'appid': app_id,
|
||||
'salt': salt,
|
||||
'sign': sign
|
||||
}
|
||||
response = requests.get(url, params=params, timeout=10)
|
||||
result = response.json()
|
||||
if 'error_code' in result:
|
||||
raise Exception(result['error_msg'])
|
||||
if 'trans_result' not in result:
|
||||
raise Exception("No response from Baidu")
|
||||
translated_text = ''
|
||||
for item in result['trans_result']:
|
||||
translated_text += "\n" + item['dst']
|
||||
return translated_text
|
||||
# return result['trans_result'][0]['dst']
|
||||
|
||||
def translate_youdao(text, from_lang, to_lang, api_config):
|
||||
url = "https://openapi.youdao.com/api"
|
||||
app_id = api_config.get('app_id', '')
|
||||
app_secret = api_config.get('app_secret', '')
|
||||
if not app_id:
|
||||
raise Exception("app_id is required")
|
||||
if not app_secret:
|
||||
raise Exception("app_secret is required")
|
||||
curtime = str(int(time.time()))
|
||||
salt = random.randint(32768, 65536)
|
||||
if(len(text) <= 20):
|
||||
input = text
|
||||
elif(len(text) > 20):
|
||||
input = text[:10] + str(len(text)) + text[-10:]
|
||||
sign = app_id + input + str(salt) + curtime + app_secret
|
||||
sign = hashlib.sha256(sign.encode()).hexdigest()
|
||||
params = {
|
||||
'q': text,
|
||||
'from': from_lang,
|
||||
'to': to_lang,
|
||||
'appKey': app_id,
|
||||
'salt': salt,
|
||||
'signType': 'v3',
|
||||
'curtime': curtime,
|
||||
'sign': sign
|
||||
}
|
||||
headers = {"Content-Type": "application/x-www-form-urlencoded; charset=UTF-8"}
|
||||
response = requests.post(url, params=params, timeout=10, headers=headers)
|
||||
result = response.json()
|
||||
if 'errorCode' not in result:
|
||||
raise Exception("No response from Youdao")
|
||||
if result['errorCode'] != '0':
|
||||
raise Exception(f'errorCode: {result["errorCode"]}')
|
||||
return result['translation'][0]
|
||||
|
||||
def translate_alibaba(text, from_lang, to_lang, api_config):
|
||||
access_key_id = api_config.get('access_key_id', '')
|
||||
access_key_secret = api_config.get('access_key_secret', '')
|
||||
region = api_config.get('region', 'cn-shanghai')
|
||||
if not access_key_id:
|
||||
raise Exception("access_key_id is required")
|
||||
if not access_key_secret:
|
||||
raise Exception("access_key_secret is required")
|
||||
if not region:
|
||||
raise Exception("region is required")
|
||||
|
||||
from aliyunsdkcore.client import AcsClient
|
||||
from aliyunsdkcore.acs_exception.exceptions import ClientException
|
||||
from aliyunsdkcore.acs_exception.exceptions import ServerException
|
||||
from aliyunsdkalimt.request.v20181012 import TranslateRequest
|
||||
|
||||
client = AcsClient(access_key_id, access_key_secret, region)
|
||||
request = TranslateRequest.TranslateRequest()
|
||||
request.set_SourceLanguage(from_lang)
|
||||
request.set_Scene("general")
|
||||
request.set_SourceText(text)
|
||||
request.set_FormatType("text") #翻译文本的格式
|
||||
request.set_TargetLanguage(to_lang)
|
||||
request.set_method("POST")
|
||||
response = client.do_action_with_exception(request)
|
||||
result = json.loads(response)
|
||||
if 'Code' not in result:
|
||||
raise Exception("No response from Alibaba")
|
||||
if result['Code'] != '200':
|
||||
raise Exception(result['Message'])
|
||||
return result['Data']['Translated']
|
||||
|
||||
def translate_tencent(text, from_lang, to_lang, api_config):
|
||||
secret_id = api_config.get('secret_id', '')
|
||||
secret_key = api_config.get('secret_key', '')
|
||||
region = api_config.get('region', 'ap-shanghai')
|
||||
if not secret_id:
|
||||
raise Exception("secret_id is required")
|
||||
if not secret_key:
|
||||
raise Exception("secret_key is required")
|
||||
if not region:
|
||||
raise Exception("region is required")
|
||||
|
||||
from tencentcloud.tmt.v20180321 import models
|
||||
from tencentcloud.common import credential
|
||||
from tencentcloud.tmt.v20180321 import tmt_client
|
||||
|
||||
request = models.TextTranslateRequest()
|
||||
request.SourceText = text
|
||||
request.Source = from_lang
|
||||
request.Target = to_lang
|
||||
request.ProjectId = 0
|
||||
cred = credential.Credential(secret_id, secret_key)
|
||||
client = tmt_client.TmtClient(cred, region)
|
||||
response = client.TextTranslate(request)
|
||||
result = json.loads(response.to_json_string())
|
||||
if 'Error' in result:
|
||||
raise Exception(result['Error']['Message'])
|
||||
if 'TargetText' not in result:
|
||||
raise Exception("No response from Tencent")
|
||||
return result['TargetText']
|
||||
|
||||
|
||||
def translate(text, from_lang, to_lang, api, api_config = {}):
|
||||
global caches
|
||||
if from_lang == 'zh_CN' or from_lang == 'zh_TW' or to_lang == 'zh_CN' or to_lang == 'zh_TW':
|
||||
os.environ['translators_default_region'] = 'China'
|
||||
else:
|
||||
os.environ['translators_default_region'] = 'EN'
|
||||
result = {
|
||||
"success": False,
|
||||
"message": "",
|
||||
"text": text,
|
||||
"translated_text": "",
|
||||
"from_lang": from_lang,
|
||||
"to_lang": to_lang,
|
||||
"api": api,
|
||||
"api_config": api_config
|
||||
}
|
||||
try:
|
||||
apis = get_translate_apis()
|
||||
find = False
|
||||
for group in apis['apis']:
|
||||
for item in group['children']:
|
||||
if item['key'] == api:
|
||||
find = item
|
||||
break
|
||||
if not find:
|
||||
result['message'] = 'translate_api_not_found'
|
||||
return result
|
||||
|
||||
# 检查语言是否支持
|
||||
from_lang = find['support'].get(from_lang, False)
|
||||
to_lang = find['support'].get(to_lang, False)
|
||||
if not from_lang or not to_lang:
|
||||
result['message'] = 'translate_language_not_support'
|
||||
return result
|
||||
|
||||
cache_name = f'{api}.{from_lang}.{to_lang}.{text}.' + json.dumps(api_config)
|
||||
cache_name = hashlib.md5(cache_name.encode('utf-8')).hexdigest()
|
||||
if cache_name in caches:
|
||||
result['translated_text'] = caches[cache_name]
|
||||
result['success'] = True
|
||||
return result
|
||||
|
||||
# print(find)
|
||||
if find['key'] == 'google':
|
||||
result['translated_text'] = translate_google(text, from_lang, to_lang, api_config)
|
||||
elif find['key'] == 'openai':
|
||||
result['translated_text'] = translate_openai(text, from_lang, to_lang, api_config)
|
||||
elif find['key'] == 'microsoft':
|
||||
result['translated_text'] = translate_microsoft(text, from_lang, to_lang, api_config)
|
||||
elif find['key'] == 'amazon':
|
||||
result['translated_text'] = translate_amazon(text, from_lang, to_lang, api_config)
|
||||
elif find['key'] == 'deepl':
|
||||
result['translated_text'] = translate_deepl(text, from_lang, to_lang, api_config)
|
||||
elif find['key'] == 'baidu':
|
||||
result['translated_text'] = translate_baidu(text, from_lang, to_lang, api_config)
|
||||
elif find['key'] == 'alibaba':
|
||||
result['translated_text'] = translate_alibaba(text, from_lang, to_lang, api_config)
|
||||
elif find['key'] == 'youdao':
|
||||
result['translated_text'] = translate_youdao(text, from_lang, to_lang, api_config)
|
||||
elif find['key'] == 'tencent':
|
||||
result['translated_text'] = translate_tencent(text, from_lang, to_lang, api_config)
|
||||
elif 'type' in find and find['type'] == 'translators':
|
||||
import translators as ts
|
||||
result['translated_text'] = ts.translate_text(text, from_language=from_lang, to_language=to_lang, translator=find['translator'], timeout=10)
|
||||
else:
|
||||
result['message'] = 'translate_api_not_support'
|
||||
return result
|
||||
|
||||
caches[cache_name] = result['translated_text']
|
||||
result['success'] = True
|
||||
return result
|
||||
|
||||
except Exception as e:
|
||||
print(e)
|
||||
result['message'] = str(e)
|
||||
return result
|
||||
Reference in New Issue
Block a user