Optimize the translation code and significantly improve the batch translation speed

Former-commit-id: 0bf39548c64e3a9c3a4409845cc03a94bd140893
This commit is contained in:
Physton
2023-06-02 18:50:58 +08:00
parent f7fe92d47a
commit 628e7875ca
20 changed files with 833 additions and 331 deletions

View File

@@ -206,7 +206,13 @@ def on_app_started(_: gr.Blocks, app: FastAPI):
@app.post("/physton_prompt/translate")
async def _translate(text: str = Body(...), from_lang: str = Body(...), to_lang: str = Body(...), api: str = Body(...), api_config: dict = Body(...)):
return translate(text, from_lang, to_lang, api, api_config)
return {"success": hi.remove_histories(data['type'])}
@app.post("/physton_prompt/translates")
async def _translates(request: Request):
data = await request.json()
if 'texts' not in data or 'from_lang' not in data or 'to_lang' not in data or 'api' not in data or 'api_config' not in data:
return {"success": False, "message": "texts or from_lang or to_lang or api or api_config is required"}
return translate(data['texts'], data['from_lang'], data['to_lang'], data['api'], data['api_config'])
@app.get("/physton_prompt/get_csvs")
async def _get_csvs():

View File

@@ -1,322 +1,119 @@
from scripts.get_translate_apis import get_translate_apis
from scripts.sign_tencent import sign_tencent
import hashlib
import os
import requests
import uuid
import random
import json
import time
import hashlib
from scripts.get_translate_apis import get_translate_apis
from scripts.translator.alibaba_translator import AlibabaTranslator
from scripts.translator.amazon_translator import AmazonTranslator
from scripts.translator.baidu_translator import BaiduTranslator
from scripts.translator.deepl_translator import DeeplTranslator
from scripts.translator.google_tanslator import GoogleTranslator
from scripts.translator.microsoft_translator import MicrosoftTranslator
from scripts.translator.openai_translator import OpenaiTranslator
from scripts.translator.tencent_translator import TencentTranslator
from scripts.translator.translators_translator import TranslatorsTranslator
from scripts.translator.youdao_translator import YoudaoTranslator
caches = {}
def translate_google(text, from_lang, to_lang, api_config):
url = 'https://translation.googleapis.com/language/translate/v2/'
api_key = api_config.get('api_key', '')
if not api_key:
raise Exception("api_key is required")
params = {
'key': api_key,
'q': text,
'source': from_lang,
'target': to_lang,
'format': 'text'
}
response = requests.get(url, params=params, timeout=10)
result = response.json()
if 'error' in result:
raise Exception(result['error']['message'])
return result['data']['translations'][0]['translatedText']
def translate_openai(text, from_lang, to_lang, api_config):
import openai
openai.api_base = api_config.get('api_base', 'https://api.openai.com/v1')
openai.api_key = api_config.get('api_key', '')
model = api_config.get('model', 'gpt-3.5-turbo')
if not openai.api_key:
raise Exception("api_key is required")
messages = [
{"role": "system", "content": "You are a translator assistant."},
{"role":
"user",
"content": f"You are a translator assistant. Please translate the following JSON data {to_lang}. Preserve the original format. Only return the translation result, without any additional content or annotations. If the prompt word is in the target language, please send it to me unchanged:\n{text}"
},
]
completion = openai.ChatCompletion.create(model=model, messages=messages, timeout=60)
if len(completion.choices) == 0:
raise Exception("No response from OpenAI")
content = completion.choices[0].message.content
return content
def translate_microsoft(text, from_lang, to_lang, api_config):
url = 'https://api.cognitive.microsofttranslator.com/translate'
api_key = api_config.get('api_key', '')
region = api_config.get('region', '')
if not api_key:
raise Exception("api_key is required")
if not region:
raise Exception("region is required")
params = {
'api-version': '3.0',
'from': from_lang,
'to': to_lang
}
headers = {
'Ocp-Apim-Subscription-Key': api_key,
'Ocp-Apim-Subscription-Region': region,
'Content-type': 'application/json',
'X-ClientTraceId': str(uuid.uuid4())
}
body = [{
'text': text
}]
response = requests.post(url, params=params, headers=headers, json=body, timeout=10)
result = response.json()
if 'error' in result:
raise Exception(result['error']['message'])
if len(result) == 0:
raise Exception("No response from Microsoft")
return result[0]['translations'][0]['text']
def translate_amazon(text, from_lang, to_lang, api_config):
import boto3
api_key_id = api_config.get('api_key_id', '')
api_key_secret = api_config.get('api_key_secret', '')
region = api_config.get('region', '')
if not api_key_id:
raise Exception("api_key_id is required")
if not api_key_secret:
raise Exception("api_key_secret is required")
if not region:
raise Exception("region is required")
translate = boto3.client(service_name='translate', region_name=region, use_ssl=True, aws_access_key_id=api_key_id, aws_secret_access_key=api_key_secret)
result = translate.translate_text(Text=text, SourceLanguageCode=from_lang, TargetLanguageCode=to_lang)
if 'TranslatedText' not in result:
raise Exception("No response from Amazon")
return result['TranslatedText']
def translate_deepl(text, from_lang, to_lang, api_config):
url = 'https://api-free.deepl.com/v2/translate'
api_key = api_config.get('api_key', '')
if not api_key:
raise Exception("api_key is required")
headers = {"Authorization": f"DeepL-Auth-Key {api_key}"}
data = {
'text': text,
'source_lang': from_lang,
'target_lang': to_lang
}
response = requests.post(url, headers=headers, data=data, timeout=10)
result = response.json()
if 'message' in result:
raise Exception(result['message'])
if 'translations' not in result:
raise Exception("No response from DeepL")
return result['translations'][0]['text']
def translate_baidu(text, from_lang, to_lang, api_config):
url = "https://fanyi-api.baidu.com/api/trans/vip/translate"
app_id = api_config.get('app_id', '')
app_secret = api_config.get('app_secret', '')
if not app_id:
raise Exception("app_id is required")
if not app_secret:
raise Exception("app_secret is required")
salt = random.randint(32768, 65536)
sign = app_id + text + str(salt) + app_secret
sign = hashlib.md5(sign.encode()).hexdigest()
params = {
'q': text,
'from': from_lang,
'to': to_lang,
'appid': app_id,
'salt': salt,
'sign': sign
}
response = requests.get(url, params=params, timeout=10)
result = response.json()
if 'error_code' in result:
raise Exception(result['error_msg'])
if 'trans_result' not in result:
raise Exception("No response from Baidu")
translated_text = []
for item in result['trans_result']:
translated_text.append(item['dst'])
return '\n'.join(translated_text)
# return result['trans_result'][0]['dst']
def translate_youdao(text, from_lang, to_lang, api_config):
url = "https://openapi.youdao.com/api"
app_id = api_config.get('app_id', '')
app_secret = api_config.get('app_secret', '')
if not app_id:
raise Exception("app_id is required")
if not app_secret:
raise Exception("app_secret is required")
curtime = str(int(time.time()))
salt = random.randint(32768, 65536)
if(len(text) <= 20):
input = text
elif(len(text) > 20):
input = text[:10] + str(len(text)) + text[-10:]
sign = app_id + input + str(salt) + curtime + app_secret
sign = hashlib.sha256(sign.encode()).hexdigest()
params = {
'q': text,
'from': from_lang,
'to': to_lang,
'appKey': app_id,
'salt': salt,
'signType': 'v3',
'curtime': curtime,
'sign': sign
}
headers = {"Content-Type": "application/x-www-form-urlencoded; charset=UTF-8"}
response = requests.post(url, params=params, timeout=10, headers=headers)
result = response.json()
if 'errorCode' not in result:
raise Exception("No response from Youdao")
if result['errorCode'] != '0':
raise Exception(f'errorCode: {result["errorCode"]}')
return result['translation'][0]
def translate_alibaba(text, from_lang, to_lang, api_config):
access_key_id = api_config.get('access_key_id', '')
access_key_secret = api_config.get('access_key_secret', '')
region = api_config.get('region', 'cn-shanghai')
if not access_key_id:
raise Exception("access_key_id is required")
if not access_key_secret:
raise Exception("access_key_secret is required")
if not region:
raise Exception("region is required")
from aliyunsdkcore.client import AcsClient
from aliyunsdkcore.acs_exception.exceptions import ClientException
from aliyunsdkcore.acs_exception.exceptions import ServerException
from aliyunsdkalimt.request.v20181012 import TranslateRequest
client = AcsClient(access_key_id, access_key_secret, region)
request = TranslateRequest.TranslateRequest()
request.set_SourceLanguage(from_lang)
request.set_Scene("general")
request.set_SourceText(text)
request.set_FormatType("text") #翻译文本的格式
request.set_TargetLanguage(to_lang)
request.set_method("POST")
response = client.do_action_with_exception(request)
result = json.loads(response)
if 'Code' not in result:
raise Exception("No response from Alibaba")
if result['Code'] != '200':
raise Exception(result['Message'])
return result['Data']['Translated']
def translate_tencent(text, from_lang, to_lang, api_config):
secret_id = api_config.get('secret_id', '')
secret_key = api_config.get('secret_key', '')
region = api_config.get('region', 'ap-shanghai')
if not secret_id:
raise Exception("secret_id is required")
if not secret_key:
raise Exception("secret_key is required")
if not region:
raise Exception("region is required")
params = {
'SourceText': text,
'Source': from_lang,
'Target': to_lang,
'ProjectId': 0
}
res = sign_tencent(secret_id, secret_key, region, params)
response = requests.post(res['url'], json=params, timeout=10, headers=res['headers'])
result = response.json()
if 'Response' not in result:
raise Exception("No response from Tencent")
if 'TargetText' not in result['Response']:
raise Exception("No response from Tencent")
return result['Response']['TargetText']
def translate(text, from_lang, to_lang, api, api_config = {}):
global caches
result = {
"success": False,
"message": "",
"text": text,
"translated_text": "",
"from_lang": from_lang,
"to_lang": to_lang,
"api": api,
"api_config": api_config
}
try:
apis = get_translate_apis()
find = False
for group in apis['apis']:
for item in group['children']:
if item['key'] == api:
find = item
break
if not find:
result['message'] = 'translate_api_not_found'
return result
# 检查语言是否支持
from_lang = find['support'].get(from_lang, False)
to_lang = find['support'].get(to_lang, False)
if not from_lang or not to_lang:
result['message'] = 'translate_language_not_support'
return result
def _translate_result(success, message, translated_text):
return {
"success": success,
"message": message,
"text": text,
"translated_text": translated_text,
"from_lang": from_lang,
"to_lang": to_lang,
"api": api,
"api_config": api_config
}
def _cache_name(text):
cache_name = f'{api}.{from_lang}.{to_lang}.{text}.' + json.dumps(api_config)
cache_name = hashlib.md5(cache_name.encode('utf-8')).hexdigest()
if cache_name in caches:
result['translated_text'] = caches[cache_name]
result['success'] = True
return result
return cache_name
# print(find)
if find['key'] == 'google':
result['translated_text'] = translate_google(text, from_lang, to_lang, api_config)
elif find['key'] == 'openai':
result['translated_text'] = translate_openai(text, from_lang, to_lang, api_config)
elif find['key'] == 'microsoft':
result['translated_text'] = translate_microsoft(text, from_lang, to_lang, api_config)
elif find['key'] == 'amazon':
result['translated_text'] = translate_amazon(text, from_lang, to_lang, api_config)
elif find['key'] == 'deepl':
result['translated_text'] = translate_deepl(text, from_lang, to_lang, api_config)
elif find['key'] == 'baidu':
result['translated_text'] = translate_baidu(text, from_lang, to_lang, api_config)
elif find['key'] == 'alibaba':
result['translated_text'] = translate_alibaba(text, from_lang, to_lang, api_config)
elif find['key'] == 'youdao':
result['translated_text'] = translate_youdao(text, from_lang, to_lang, api_config)
elif find['key'] == 'tencent':
result['translated_text'] = translate_tencent(text, from_lang, to_lang, api_config)
elif 'type' in find and find['type'] == 'translators':
region = api_config.get('region', 'CN')
os.environ['translators_default_region'] = region
from translators.server import translate_text, translate_html, translators_pool, preaccelerate, tss
tss.server_region = region
tss._bing.server_region = region
tss._google.server_region = region
result['translated_text'] = translate_text(text, from_language=from_lang, to_language=to_lang, translator=find['translator'], timeout=10)
apis = get_translate_apis()
find = False
for group in apis['apis']:
for item in group['children']:
if item['key'] == api:
find = item
break
if not find:
return _translate_result(False, 'translate api not found', '')
try:
texts = []
if isinstance(text, list):
if len(text) < 1:
return _translate_result(False, 'translate text is empty', '')
for item in text:
texts.append(None)
for index in range(len(text)):
item = text[index]
item = item.strip()
if item == '':
texts[index] = ''
continue
cache_name = _cache_name(item)
if cache_name in caches:
texts[index] = caches[cache_name]
else:
texts[index] = None
else:
result['message'] = 'translate_api_not_support'
return result
text = text.strip()
if text == '':
return _translate_result(False, 'translate text is empty', '')
cache_name = _cache_name(text)
if cache_name in caches:
return _translate_result(True, '', caches[cache_name])
result['translated_text'] = result['translated_text'].strip()
caches[cache_name] = result['translated_text']
result['success'] = True
return result
if api == 'google':
translator = GoogleTranslator()
elif api == 'microsoft':
translator = MicrosoftTranslator()
elif api == 'openai':
translator = OpenaiTranslator()
elif api == 'amazon':
translator = AmazonTranslator()
elif api == 'deepl':
translator = DeeplTranslator()
elif api == 'baidu':
translator = BaiduTranslator()
elif api == 'alibaba':
translator = AlibabaTranslator()
elif api == 'youdao':
translator = YoudaoTranslator()
elif api == 'tencent':
translator = TencentTranslator()
elif 'type' in find and find['type'] == 'translators':
translator = TranslatorsTranslator(api)
translator.set_translator(find['translator'])
else:
return _translate_result(False, 'translate api not support', '')
translator.set_from_lang(from_lang)
translator.set_to_lang(to_lang)
translator.set_api_config(api_config)
if isinstance(text, list):
translate_texts = []
translate_indexes = []
for index in range(len(texts)):
item = texts[index]
if item == None:
translate_indexes.append(index)
translate_texts.append(text[index])
result = translator.translate_batch(translate_texts)
for index in range(len(result)):
item = result[index]
texts[translate_indexes[index]] = item
caches[_cache_name(translate_texts[index])] = item
return _translate_result(True, '', texts)
else:
translated_text = translator.translate(text).strip()
caches[_cache_name(text)] = translated_text
return _translate_result(True, '', translated_text)
except Exception as e:
# print(e)
result['message'] = str(e)
return result
return _translate_result(False, str(e), '')

View File

@@ -0,0 +1,93 @@
from scripts.translator.base_tanslator import BaseTranslator
import json
from math import ceil
class AlibabaTranslator(BaseTranslator):
def __init__(self):
super().__init__('alibaba')
def _get_config(self):
access_key_id = self.api_config.get('access_key_id', '')
access_key_secret = self.api_config.get('access_key_secret', '')
region = self.api_config.get('region', 'cn-shanghai')
if not access_key_id:
raise Exception("access_key_id is required")
if not access_key_secret:
raise Exception("access_key_secret is required")
if not region:
raise Exception("region is required")
return access_key_id, access_key_secret, region
def translate(self, text):
if not text:
return ''
access_key_id, access_key_secret, region = self._get_config()
from aliyunsdkcore.client import AcsClient
from aliyunsdkalimt.request.v20181012 import TranslateRequest
client = AcsClient(access_key_id, access_key_secret, region)
request = TranslateRequest.TranslateRequest()
request.set_SourceLanguage(self.from_lang)
request.set_Scene("general")
request.set_SourceText(text)
request.set_FormatType("text") #翻译文本的格式
request.set_TargetLanguage(self.to_lang)
request.set_method("POST")
response = client.do_action_with_exception(request)
result = json.loads(response)
if 'Code' not in result:
raise Exception("No response from Alibaba")
if result['Code'] != '200':
raise Exception(result['Message'])
if 'Translated' not in result['Data']:
raise Exception("No response from Alibaba")
return result['Data']['Translated']
def translate_batch(self, texts):
if not texts:
return []
access_key_id, access_key_secret, region = self._get_config()
from aliyunsdkcore.client import AcsClient
from aliyunsdkalimt.request.v20181012 import GetBatchTranslateRequest
results = []
concurrent = self.get_concurrent()
texts_len = len(texts)
group_num = ceil(texts_len / concurrent)
for i in range(group_num):
start = i * concurrent
end = (i + 1) * concurrent
if end > texts_len:
end = texts_len
group_texts = texts[start:end]
source_texts = {}
dist_texts = {}
for i in range(len(group_texts)):
source_texts[str(i)] = group_texts[i]
dist_texts[str(i)] = ''
client = AcsClient(access_key_id, access_key_secret, region)
request = GetBatchTranslateRequest.GetBatchTranslateRequest()
request.set_SourceLanguage(self.from_lang)
request.set_Scene("general")
request.set_SourceText(source_texts)
request.set_FormatType("text")
request.set_TargetLanguage(self.to_lang)
request.set_ApiType("translate_standard")
request.set_method("POST")
response = client.do_action_with_exception(request)
result = json.loads(response)
if 'Code' not in result:
raise Exception("No response from Alibaba")
if result['Code'] != '200':
raise Exception(result['Message'])
for item in result['TranslatedList']:
index = item['index']
if item['code'] == '200':
dist_texts[index] = item['translated']
for i in range(len(group_texts)):
results.append(dist_texts[str(i)])
return results

View File

@@ -0,0 +1,25 @@
from scripts.translator.base_tanslator import BaseTranslator
class AmazonTranslator(BaseTranslator):
def __init__(self):
super().__init__('amazon')
def translate(self, text):
if not text:
return ''
api_key_id = self.api_config.get('api_key_id', '')
api_key_secret = self.api_config.get('api_key_secret', '')
region = self.api_config.get('region', '')
if not api_key_id:
raise Exception("api_key_id is required")
if not api_key_secret:
raise Exception("api_key_secret is required")
if not region:
raise Exception("region is required")
import boto3
translate = boto3.client(service_name='translate', region_name=region, use_ssl=True, aws_access_key_id=api_key_id, aws_secret_access_key=api_key_secret)
result = translate.translate_text(Text=text, SourceLanguageCode=self.from_lang, TargetLanguageCode=self.to_lang)
if 'TranslatedText' not in result:
raise Exception("No response from Amazon")
return result['TranslatedText']

View File

@@ -0,0 +1,53 @@
from scripts.translator.base_tanslator import BaseTranslator
import requests
import hashlib
import random
class BaiduTranslator(BaseTranslator):
def __init__(self):
super().__init__('baidu')
def translate(self, text):
if not text:
return ''
url = "https://fanyi-api.baidu.com/api/trans/vip/translate"
app_id = self.api_config.get('app_id', '')
app_secret = self.api_config.get('app_secret', '')
if not app_id:
raise Exception("app_id is required")
if not app_secret:
raise Exception("app_secret is required")
salt = random.randint(32768, 65536)
send_text = text
if isinstance(text, list):
send_text = '\n'.join(send_text)
sign = app_id + send_text + str(salt) + app_secret
sign = hashlib.md5(sign.encode()).hexdigest()
params = {
'q': send_text,
'from': self.from_lang,
'to': self.to_lang,
'appid': app_id,
'salt': salt,
'sign': sign
}
response = requests.get(url, params=params, timeout=10)
result = response.json()
if 'error_code' in result:
raise Exception(result['error_msg'])
if 'trans_result' not in result:
raise Exception("No response from Baidu")
translated_text = []
for item in result['trans_result']:
translated_text.append(item['dst'])
if isinstance(text, list):
return translated_text
else:
return '\n'.join(translated_text)
def translate_batch(self, texts):
if not texts:
return []
for text in texts:
text = text.replace('\n', ' ')
return self.translate(texts)

View File

@@ -0,0 +1,86 @@
from scripts.get_translate_apis import get_translate_apis
from abc import ABC, abstractmethod
from math import ceil
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
from collections import OrderedDict
class BaseTranslator(ABC):
from_lang = None
to_lang = None
api = None
api_config = {}
api_item = {}
def __init__(self, api):
self.api = api
apis = get_translate_apis()
find = False
for group in apis['apis']:
for item in group['children']:
if item['key'] == api:
find = item
break
if not find:
raise Exception('translate api not found')
self.api_item = find
def set_from_lang(self, from_lang):
from_lang = self.api_item['support'].get(from_lang, False)
if not from_lang:
raise Exception('translate language not support')
self.from_lang = from_lang
return self
def set_to_lang(self, to_lang):
to_lang = self.api_item['support'].get(to_lang, False)
if not to_lang:
raise Exception('translate language not support')
self.to_lang = to_lang
return self
def set_api_config(self, api_config):
self.api_config = api_config
return self
def get_concurrent(self):
concurrent = 1
if self.api_item.get('concurrent', False):
concurrent = self.api_item['concurrent']
return concurrent
@abstractmethod
def translate(self, text):
pass
def translate_batch(self, texts):
self.concurrent = self.get_concurrent()
concurrent = self.get_concurrent()
texts_len = len(texts)
group_num = ceil(texts_len / concurrent)
# 分组并发翻译每组完成后等待1秒然后再进行下一组
results = []
with ThreadPoolExecutor(max_workers=concurrent) as executor:
for i in range(group_num):
group_texts = texts[i*concurrent: (i+1)*concurrent]
texts_dict = {}
futures = []
for i in range(len(group_texts)):
text = group_texts[i]
texts_dict[str(i)] = ''
future = executor.submit(self.translate, text)
futures.append(future)
for i in range(len(futures)):
future = futures[i]
text_dict = texts_dict
future.result()
text_dict[str(i)] = future.result()
for i in range(len(texts_dict)):
results.append(texts_dict[str(i)])
time.sleep(1)
return results

View File

@@ -0,0 +1,44 @@
from scripts.translator.base_tanslator import BaseTranslator
import requests
class DeeplTranslator(BaseTranslator):
def __init__(self):
super().__init__('deepl')
def translate(self, text):
if not text:
if isinstance(result['translations'], list):
return []
else:
return ''
url = 'https://api-free.deepl.com/v2/translate'
api_key = self.api_config.get('api_key', '')
if not api_key:
raise Exception("api_key is required")
headers = {"Authorization": f"DeepL-Auth-Key {api_key}"}
data = {
'text': text,
'source_lang': self.from_lang,
'target_lang': self.to_lang
}
response = requests.post(url, headers=headers, data=data, timeout=10)
if response.status_code != 200:
raise Exception("DeepL request error")
if not response.text:
raise Exception("DeepL response is empty")
result = response.json()
if 'message' in result:
raise Exception(result['message'])
if 'translations' not in result:
raise Exception("No response from DeepL")
if isinstance(result['translations'], list):
results = []
for item in result['translations']:
results.append(item['text'])
return results
else:
return result['translations'][0]['text']
def translate_batch(self, texts):
return self.translate(texts)

View File

@@ -0,0 +1,30 @@
from scripts.translator.base_tanslator import BaseTranslator
import requests
class GoogleTranslator(BaseTranslator):
def __init__(self):
super().__init__('google')
def translate(self, text):
if not text:
return ''
url = 'https://translation.googleapis.com/language/translate/v2/'
api_key = self.api_config.get('api_key', '')
if not api_key:
raise Exception("api_key is required")
params = {
'key': api_key,
'q': text,
'source': self.from_lang,
'target': self.to_lang,
'format': 'text'
}
response = requests.get(url, params=params, timeout=10)
result = response.json()
if 'error' in result:
raise Exception(result['error']['message'])
if 'data' not in result:
raise Exception("No response from Google")
if 'translations' not in result['data']:
raise Exception("No response from Google")
return result['data']['translations'][0]['translatedText']

View File

@@ -0,0 +1,54 @@
from scripts.translator.base_tanslator import BaseTranslator
import uuid
import requests
class MicrosoftTranslator(BaseTranslator):
def __init__(self):
super().__init__('microsoft')
def translate(self, text):
if not text:
if isinstance(result['translations'], list):
return []
else:
return ''
url = 'https://api.cognitive.microsofttranslator.com/translate'
api_key = self.api_config.get('api_key', '')
region = self.api_config.get('region', '')
if not api_key:
raise Exception("api_key is required")
if not region:
raise Exception("region is required")
params = {
'api-version': '3.0',
'from': self.from_lang,
'to': self.to_lang
}
headers = {
'Ocp-Apim-Subscription-Key': api_key,
'Ocp-Apim-Subscription-Region': region,
'Content-type': 'application/json',
'X-ClientTraceId': str(uuid.uuid4())
}
body = []
if isinstance(text, list):
for item in text:
body.append({'text': item})
else:
body.append({'text': text})
response = requests.post(url, params=params, headers=headers, json=body, timeout=10)
result = response.json()
if 'error' in result:
raise Exception(result['error']['message'])
if len(result) == 0:
raise Exception("No response from Microsoft")
if isinstance(text, list):
return [item['translations'][0]['text'] for item in result]
else:
return result[0]['translations'][0]['text']
def translate_batch(self, texts):
return self.translate(texts)

View File

@@ -0,0 +1,59 @@
from scripts.translator.base_tanslator import BaseTranslator
import json
class OpenaiTranslator(BaseTranslator):
def __init__(self):
super().__init__('openai')
def translate(self, text):
if not text:
if isinstance(text, list):
return []
else:
return ''
import openai
openai.api_base = self.api_config.get('api_base', 'https://api.openai.com/v1')
openai.api_key = self.api_config.get('api_key', '')
model = self.api_config.get('model', 'gpt-3.5-turbo')
if not openai.api_key:
raise Exception("api_key is required")
body = []
if isinstance(text, list):
for item in text:
body.append({'text': item})
else:
body.append({'text': text})
body_str = json.dumps(body, ensure_ascii=False)
messages = [
{"role": "system", "content": "You are a translator assistant."},
{"role":
"user",
"content": f"You are a translator assistant. Please translate the following JSON data {self.to_lang}. Preserve the original format. Only return the translation result, without any additional content or annotations. If the prompt word is in the target language, please send it to me unchanged:\n{body_str}"
},
]
completion = openai.ChatCompletion.create(model=model, messages=messages, timeout=60)
if len(completion.choices) == 0:
raise Exception("No response from OpenAI")
content = completion.choices[0].message.content
result_json = ''
try:
# 找到第一个[,然后找到最后一个],截取中间的内容
start = content.index('[')
end = content.rindex(']')
if start == -1 or end == -1:
raise Exception("OpenAI response error")
result_json = '['+ content[start+1:end] +']'
# 解析json
result = json.loads(result_json)
if isinstance(text, list):
return [item['text'] for item in result]
else:
return result[0]['text']
except Exception as e:
raise Exception("OpenAI response error")
def translate_batch(self, texts):
return self.translate(texts)

View File

@@ -1,10 +1,66 @@
from scripts.translator.base_tanslator import BaseTranslator
import requests
import hashlib, hmac, json, time
from datetime import datetime
class TencentTranslator(BaseTranslator):
def __init__(self):
super().__init__('tencent')
def _get_config(self):
secret_id = self.api_config.get('secret_id', '')
secret_key = self.api_config.get('secret_key', '')
region = self.api_config.get('region', 'ap-shanghai')
if not secret_id:
raise Exception("secret_id is required")
if not secret_key:
raise Exception("secret_key is required")
if not region:
raise Exception("region is required")
return secret_id, secret_key, region
def translate(self, text):
if not text:
return ''
secret_id, secret_key, region = self._get_config()
params = {
'SourceText': text,
'Source': self.from_lang,
'Target': self.to_lang,
'ProjectId': 0
}
res = sign_tencent(secret_id, secret_key, region, params)
response = requests.post(res['url'], json=params, timeout=10, headers=res['headers'])
result = response.json()
if 'Response' not in result:
raise Exception("No response from Tencent")
if 'TargetText' not in result['Response']:
raise Exception("No response from Tencent")
return result['Response']['TargetText']
def translate_batch(self, texts):
if not texts:
return []
secret_id, secret_key, region = self._get_config()
params = {
'SourceTextList': texts,
'Source': self.from_lang,
'Target': self.to_lang,
'ProjectId': 0
}
res = sign_tencent(secret_id, secret_key, region, params, 'TextTranslateBatch')
response = requests.post(res['url'], json=params, timeout=10, headers=res['headers'])
result = response.json()
if 'Response' not in result:
raise Exception("No response from Tencent")
if 'TargetTextList' not in result['Response']:
raise Exception("No response from Tencent")
return result['Response']['TargetTextList']
def sign_tencent(secret_id, secret_key, regin, params, action="TextTranslate", version = "2018-03-21"):
host = 'tmt.tencentcloudapi.com'
endpoint = "https://" + host
action = "TextTranslate"
service = "tmt"
algorithm = "TC3-HMAC-SHA256"
@@ -62,4 +118,4 @@ def sign_tencent(secret_id, secret_key, regin, params, action="TextTranslate", v
"X-TC-Version": version,
"X-TC-Region": regin,
},
}
}

View File

@@ -0,0 +1,18 @@
from scripts.translator.base_tanslator import BaseTranslator
import os
class TranslatorsTranslator(BaseTranslator):
translator = None
def set_translator(self, translator):
self.translator = translator
return self
def translate(self, text):
region = self.api_config.get('region', 'CN')
os.environ['translators_default_region'] = region
from translators.server import translate_text, tss
tss.server_region = region
tss._bing.server_region = region
tss._google.server_region = region
return translate_text(text, from_language=self.from_lang, to_language=self.to_lang, translator=self.translator, timeout=30)

View File

@@ -0,0 +1,71 @@
from scripts.translator.base_tanslator import BaseTranslator
import requests
import hashlib
import random
import time
class YoudaoTranslator(BaseTranslator):
def __init__(self):
super().__init__('youdao')
def translate(self, text):
if not text:
if isinstance(text, list):
return []
else:
return ''
if isinstance(text, list):
url = "https://openapi.youdao.com/v2/api"
else:
url = "https://openapi.youdao.com/api"
app_id = self.api_config.get('app_id', '')
app_secret = self.api_config.get('app_secret', '')
if not app_id:
raise Exception("app_id is required")
if not app_secret:
raise Exception("app_secret is required")
curtime = str(int(time.time()))
salt = random.randint(32768, 65536)
if isinstance(text, list):
sign_text = "".join(text)
else:
sign_text = text
if(len(sign_text) <= 20):
input = sign_text
elif(len(sign_text) > 20):
input = sign_text[:10] + str(len(sign_text)) + sign_text[-10:]
sign = app_id + input + str(salt) + curtime + app_secret
sign = hashlib.sha256(sign.encode()).hexdigest()
params = {
'q': text,
'from': self.from_lang,
'to': self.to_lang,
'appKey': app_id,
'salt': salt,
'signType': 'v3',
'curtime': curtime,
'sign': sign
}
headers = {"Content-Type": "application/x-www-form-urlencoded; charset=UTF-8"}
response = requests.post(url, params=params, timeout=10, headers=headers)
result = response.json()
if 'errorCode' not in result:
raise Exception("No response from Youdao")
if result['errorCode'] != '0':
raise Exception(f'errorCode: {result["errorCode"]}')
if isinstance(text, list):
if 'translateResults' not in result:
raise Exception("No response from Youdao")
results = []
for item in result['translateResults']:
if 'translation' not in item:
raise Exception("No response from Youdao")
results.append(item['translation'])
return results
else:
if 'translation' not in result:
raise Exception("No response from Youdao")
return result['translation'][0]
def translate_batch(self, texts):
return self.translate(texts)