Files
SillyTavern-extras/talkinghead/tha3/app/app.py
2024-01-18 22:04:02 +02:00

1230 lines
62 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""THA3 live mode for SillyTavern-extras.
This is the animation engine, running on top of the THA3 posing engine.
This module implements the live animation backend and serves the API. For usage, see `server.py`.
If you want to play around with THA3 expressions in a standalone app, see `manual_poser.py`.
"""
__all__ = ["set_emotion_from_classification", "set_emotion",
"unload",
"start_talking", "stop_talking",
"result_feed",
"talkinghead_load_file",
"launch"]
import atexit
import io
import json
import logging
import math
import os
import random
import sys
import time
import numpy as np
import threading
from typing import Any, Dict, List, NoReturn, Optional, Union
import PIL
import torch
from flask import Flask, Response
from flask_cors import CORS
from tha3.poser.modes.load_poser import load_poser
from tha3.poser.poser import Poser
from tha3.util import (torch_linear_to_srgb, resize_PIL_image,
extract_PIL_image_from_filelike, extract_pytorch_image_from_PIL_image)
from tha3.app.postprocessor import Postprocessor
from tha3.app.util import posedict_keys, posedict_key_to_index, load_emotion_presets, posedict_to_pose, to_talkinghead_image, RunningAverage
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# --------------------------------------------------------------------------------
# Global variables
# Default configuration for the animator, loaded when the plugin is launched.
# Doubles as the authoritative documentation of the animator settings (beside the animation driver docstrings and the actual source code).
animator_defaults = {"target_fps": 25, # Desired output frames per second. Note this only affects smoothness of the output (if hardware allows).
# The speed at which the animation evolves is based on wall time. Snapshots are rendered at the target FPS,
# or if the hardware is too slow to reach the target FPS, then as often as hardware allows.
# For smooth animation, make the FPS lower than what your hardware could produce, so that some compute
# remains untapped, available to smooth over the occasional hiccup from other running programs.
"crop_left": 0.0, # in units where the image width is 2.0
"crop_right": 0.0, # in units where the image width is 2.0
"crop_top": 0.0, # in units where the image height is 2.0
"crop_bottom": 0.0, # in units where the image height is 2.0
"pose_interpolator_step": 0.1, # 0 < this <= 1; at each frame at a reference of 25 FPS; FPS-corrected automatically; see `interpolate_pose`.
"blink_interval_min": 2.0, # seconds, lower limit for random minimum time until next blink is allowed.
"blink_interval_max": 5.0, # seconds, upper limit for random minimum time until next blink is allowed.
"blink_probability": 0.03, # At each frame at a reference of 25 FPS; FPS-corrected automatically.
"blink_confusion_duration": 10.0, # seconds, upon entering "confusion" emotion, during which blinking quickly in succession is allowed.
"talking_fps": 12, # How often to re-randomize mouth during talking animation.
# Early 2000s anime used ~12 FPS as the fastest actual framerate of new cels (not counting camera panning effects and such).
"talking_morph": "mouth_aaa_index", # which mouth-open morph to use for talking; for available values, see `posedict_keys`
"sway_morphs": ["head_x_index", "head_y_index", "neck_z_index", "body_y_index", "body_z_index"], # which morphs to sway; see `posedict_keys`
"sway_interval_min": 5.0, # seconds, lower limit for random time interval until randomizing new sway pose.
"sway_interval_max": 10.0, # seconds, upper limit for random time interval until randomizing new sway pose.
"sway_macro_strength": 0.6, # [0, 1], in sway pose, max abs deviation from emotion pose target morph value for each sway morph,
# but also max deviation from center. The emotion pose itself may use higher values; in such cases,
# sway will only occur toward the center. See `compute_sway_target_pose` for details.
"sway_micro_strength": 0.02, # [0, 1], max abs random noise added each frame. No limiting other than a clamp of final pose to [-1, 1].
"breathing_cycle_duration": 4.0, # seconds, for a full breathing cycle.
"postprocessor_chain": []} # Pixel-space glitch artistry settings; see `postprocessor.py`.
talkinghead_basedir = "talkinghead"
global_animator_instance = None
_animator_output_lock = threading.Lock() # protect from concurrent access to `result_image` and the `new_frame_available` flag.
global_encoder_instance = None
global_latest_frame_sent = None
# These need to be written to by the API functions.
#
# Since the plugin might not have been started yet at that time (so the animator instance might not exist),
# it's better to keep this state in module-level globals rather than in attributes of the animator.
animation_running = False # used in initial bootup state, and while loading a new image
current_emotion = "neutral"
is_talking = False
global_reload_image = None
target_fps = 25 # value overridden by `load_animator_settings` at animator startup
# --------------------------------------------------------------------------------
# API
# Flask setup
app = Flask(__name__)
CORS(app)
def set_emotion_from_classification(emotion_scores: List[Dict[str, Union[str, float]]]) -> str:
"""Set the current emotion of the character based on sentiment analysis results.
Currently, we pick the emotion with the highest confidence score.
`emotion_scores`: results from classify module: [{"label": emotion0, "score": confidence0}, ...]
Return a status message for passing over HTTP.
"""
highest_score = float("-inf")
highest_label = None
for item in emotion_scores:
if item["score"] > highest_score:
highest_score = item["score"]
highest_label = item["label"]
logger.info(f"set_emotion_from_classification: winning score: {highest_label} = {highest_score}")
return set_emotion(highest_label)
def set_emotion(emotion: str) -> str:
"""Set the current emotion of the character.
Return a status message for passing over HTTP.
"""
global current_emotion
if emotion not in global_animator_instance.emotions:
logger.warning(f"set_emotion: specified emotion '{emotion}' does not exist, selecting 'neutral'")
emotion = "neutral"
logger.info(f"set_emotion: applying emotion {emotion}")
current_emotion = emotion
return f"emotion set to {emotion}"
def unload() -> str:
"""Stop animation.
Return a status message for passing over HTTP.
"""
global animation_running
animation_running = False
logger.info("unload: animation paused")
return "animation paused"
def start_talking() -> str:
"""Start talking animation.
Return a status message for passing over HTTP.
"""
global is_talking
is_talking = True
logger.debug("start_talking called")
return "talking started"
def stop_talking() -> str:
"""Stop talking animation.
Return a status message for passing over HTTP.
"""
global is_talking
is_talking = False
logger.debug("stop_talking called")
return "talking stopped"
# There are three tasks we must do each frame:
#
# 1) Render an animation frame
# 2) Encode the new animation frame for network transport
# 3) Send the animation frame over the network
#
# Instead of running serially:
#
# [render1][encode1][send1] [render2][encode2][send2]
# ------------------------------------------------------> time
#
# we get better throughput by parallelizing and interleaving:
#
# [render1] [render2] [render3] [render4] [render5]
# [encode1] [encode2] [encode3] [encode4]
# [send1] [send2] [send3]
# ----------------------------------------------------> time
#
# Despite the global interpreter lock, this increases throughput, as well as improves the timing of the network send
# since the network thread only needs to care about getting the send timing right.
#
# Either there's enough waiting for I/O for the split between render and encode to make a difference, or it's the fact
# that much of the compute-heavy work in both of those is performed inside C libraries that release the GIL (Torch,
# and the PNG encoder in Pillow, respectively).
#
# This is a simplified picture. Some important details:
#
# - At startup:
# - The animator renders the first frame on its own.
# - The encoder waits for the animator to publish a frame, and then starts normal operation.
# - The network thread waits for the encoder to publish a frame, and then starts normal operation.
# - In normal operation (after startup):
# - The animator waits until the encoder has consumed the previous published frame. Then it proceeds to render and publish a new frame.
# - This communication is handled through the flag `animator.new_frame_available`.
# - The network thread does its own thing on a regular schedule, based on the desired target FPS.
# - However, the network thread publishes metadata on which frame is the latest that has been sent over the network at least once.
# This is stored as an `id` (i.e. memory address) in `global_latest_frame_sent`.
# - If the target FPS is too high for the animator and/or encoder to keep up with, the network thread re-sends
# the latest frame published by the encoder as many times as necessary, to keep the network output at the target FPS
# regardless of render/encode speed. This handles the case of hardware slower than the target FPS.
# - On localhost, the network send is very fast, under 0.15 ms.
# - The encoder uses the metadata to wait until the latest encoded frame has been sent at least once before publishing a new frame.
# This ensures that no more frames are generated than are actually sent, and syncs also the animator (because the animator is
# rate-limited by the encoder consuming its frames). This handles the case of hardware faster than the target FPS.
# - When the animator and encoder are fast enough to keep up with the target FPS, generally when frame N is being sent,
# frame N+1 is being encoded (or is already encoded, and waiting for frame N to be sent), and frame N+2 is being rendered.
#
def result_feed() -> Response:
"""Return a Flask `Response` that repeatedly yields the current image as 'image/png'."""
def generate():
global global_latest_frame_sent
last_frame_send_complete_time = None
last_report_time = None
send_duration_sec = 0.0
send_duration_statistics = RunningAverage()
while True:
# Send the latest available animation frame.
# Important: grab reference to `image_bytes` only once, since it will be atomically updated without a lock.
image_bytes = global_encoder_instance.image_bytes
if image_bytes is not None:
# How often should we send?
# - Excessive spamming can DoS the SillyTavern GUI, so there needs to be a rate limit.
# - OTOH, we must constantly send something, or the GUI will lock up waiting.
# Therefore, send at a target FPS that yields a nice-looking animation.
frame_duration_target_sec = 1 / target_fps
if last_frame_send_complete_time is not None:
time_now = time.time_ns()
this_frame_elapsed_sec = (time_now - last_frame_send_complete_time) / 10**9
# The 2* is a fudge factor. It doesn't matter if the frame is a bit too early, but we don't want it to be late.
time_until_frame_deadline = frame_duration_target_sec - this_frame_elapsed_sec - 2 * send_duration_sec
else:
time_until_frame_deadline = 0.0 # nothing rendered yet
if time_until_frame_deadline <= 0.0:
time_now = time.time_ns()
yield (b"--frame\r\n"
b"Content-Type: image/png\r\n\r\n" + image_bytes + b"\r\n")
global_latest_frame_sent = id(image_bytes) # atomic update, no need for lock
send_duration_sec = (time.time_ns() - time_now) / 10**9 # about 0.12 ms on localhost (compress_level=1 or 6, doesn't matter)
# print(f"send {send_duration_sec:0.6g}s") # DEBUG
# Update the FPS counter, measuring the time between network sends.
time_now = time.time_ns()
if last_frame_send_complete_time is not None:
this_frame_elapsed_sec = (time_now - last_frame_send_complete_time) / 10**9
send_duration_statistics.add_datapoint(this_frame_elapsed_sec)
last_frame_send_complete_time = time_now
else:
time.sleep(time_until_frame_deadline)
# Log the FPS counter in 5-second intervals.
time_now = time.time_ns()
if animation_running and (last_report_time is None or time_now - last_report_time > 5e9):
avg_send_sec = send_duration_statistics.average()
msec = round(1000 * avg_send_sec, 1)
target_msec = round(1000 * frame_duration_target_sec, 1)
fps = round(1 / avg_send_sec, 1) if avg_send_sec > 0.0 else 0.0
logger.info(f"output: {msec:.1f}ms [{fps:.1f} FPS]; target {target_msec:.1f}ms [{target_fps:.1f} FPS]")
last_report_time = time_now
else: # first frame not yet available
time.sleep(0.1)
return Response(generate(), mimetype="multipart/x-mixed-replace; boundary=frame")
# TODO: the input is a flask.request.file.stream; what's the type of that?
def talkinghead_load_file(stream) -> str:
"""Load image from stream and start animation."""
global global_reload_image
global animation_running
logger.info("talkinghead_load_file: loading new input image from stream")
try:
animation_running = False # pause animation while loading a new image
pil_image = PIL.Image.open(stream) # Load the image using PIL.Image.open
img_data = io.BytesIO() # Create a copy of the image data in memory using BytesIO
pil_image.save(img_data, format="PNG")
global_reload_image = PIL.Image.open(io.BytesIO(img_data.getvalue())) # Set the global_reload_image to a copy of the image data
except PIL.Image.UnidentifiedImageError:
logger.warning("Could not load input image from stream, loading blank")
full_path = os.path.join(os.getcwd(), os.path.join(talkinghead_basedir, "tha3", "images", "inital.png"))
global_reload_image = PIL.Image.open(full_path)
finally:
animation_running = True
return "OK"
def launch(device: str, model: str) -> Union[None, NoReturn]:
"""Launch the talking head plugin (live mode).
If the plugin fails to load, the process exits.
device: "cpu" or "cuda"
model: one of the folder names inside "talkinghead/tha3/models/"
"""
global global_animator_instance
global global_encoder_instance
try:
# If the animator already exists, clean it up first
if global_animator_instance is not None:
logger.info(f"launch: relaunching on device {device} with model {model}")
global_animator_instance.exit()
global_animator_instance = None
global_encoder_instance.exit()
global_encoder_instance = None
logger.info("launch: loading the THA3 posing engine")
poser = load_poser(model, device, modelsdir=os.path.join(talkinghead_basedir, "tha3", "models"))
global_animator_instance = Animator(poser, device)
global_encoder_instance = Encoder()
# Load initial blank character image
full_path = os.path.join(os.getcwd(), os.path.join(talkinghead_basedir, "tha3", "images", "inital.png"))
global_animator_instance.load_image(full_path)
global_animator_instance.start()
global_encoder_instance.start()
except RuntimeError as exc:
logger.error(exc)
sys.exit()
# --------------------------------------------------------------------------------
# Internal stuff
def convert_linear_to_srgb(image: torch.Tensor) -> torch.Tensor:
"""RGBA (linear) -> RGBA (SRGB), preserving the alpha channel."""
rgb_image = torch_linear_to_srgb(image[0:3, :, :])
return torch.cat([rgb_image, image[3:4, :, :]], dim=0)
class Animator:
"""uWu Waifu"""
def __init__(self, poser: Poser, device: torch.device):
self.poser = poser
self.device = device
self.postprocessor = Postprocessor(device)
self.render_duration_statistics = RunningAverage()
self.animator_thread = None
self.source_image: Optional[torch.tensor] = None
self.result_image: Optional[np.array] = None
self.new_frame_available = False
self.last_report_time = None
self.reset_animation_state()
self.load_emotion_templates()
self.load_animator_settings()
# --------------------------------------------------------------------------------
# Management
def start(self) -> None:
"""Start the animation thread."""
self._terminated = False
def animator_update():
while not self._terminated:
try:
self.render_animation_frame()
except Exception as exc:
logger.error(exc)
raise # let the animator stop so we won't spam the log
time.sleep(0.01) # rate-limit the renderer to 100 FPS maximum (this could be adjusted later)
self.animator_thread = threading.Thread(target=animator_update, daemon=True)
self.animator_thread.start()
atexit.register(self.exit)
def exit(self) -> None:
"""Terminate the animation thread.
Called automatically when the process exits.
"""
self._terminated = True
self.animator_thread.join()
self.animator_thread = None
def reset_animation_state(self):
"""Reset character state trackers for all animation drivers."""
self.current_pose = None
self.last_emotion = None
self.last_emotion_change_timestamp = None
self.last_sway_target_timestamp = None
self.last_sway_target_pose = None
self.last_microsway_timestamp = None
self.sway_interval = None
self.last_blink_timestamp = None
self.blink_interval = None
self.last_talking_timestamp = None
self.last_talking_target_value = None
self.was_talking = False
self.breathing_epoch = time.time_ns()
def load_emotion_templates(self, emotions: Optional[Dict[str, Dict[str, float]]] = None) -> None:
"""Load emotion templates.
`emotions`: `{emotion0: {morph0: value0, ...}, ...}`
Optional dict of custom emotion templates.
If not given, this loads the templates from the emotion JSON files
in `talkinghead/emotions/`.
If given:
- Each emotion NOT supplied is populated from the defaults.
- In each emotion that IS supplied, each morph that is NOT mentioned
is implicitly set to zero (due to how `apply_emotion_to_pose` works).
For an example JSON file containing a suitable dictionary, see `talkinghead/emotions/_defaults.json`.
For available morph names, see `posedict_keys` in `talkinghead/tha3/app/util.py`.
For some more detail, see `talkinghead/tha3/poser/modes/pose_parameters.py`.
"Arity 2" means `posedict_keys` has separate left/right morphs.
If still in doubt, see the GUI panel implementations in `talkinghead/tha3/app/manual_poser.py`.
"""
# Load defaults as a base
self.emotions, self.emotion_names = load_emotion_presets(os.path.join("talkinghead", "emotions"))
# Then override defaults, and add any new custom emotions
if emotions is not None:
logger.info(f"load_emotion_templates: loading user-specified templates for emotions {list(sorted(emotions.keys()))}")
self.emotions.update(emotions)
emotion_names = set(self.emotion_names)
emotion_names.update(emotions.keys())
self.emotion_names = list(sorted(emotion_names))
else:
logger.info("load_emotion_templates: loaded default emotion templates")
def load_animator_settings(self, settings: Optional[Dict[str, Any]] = None) -> None:
"""Load animator settings.
`settings`: `{setting0: value0, ...}`
Optional dict of settings. The type and semantics of each value depends on each
particular setting.
For available settings, see `animator_defaults` in `talkinghead/tha3/app/app.py`.
Particularly for the setting `"postprocessor_chain"` (pixel-space glitch artistry),
see `talkinghead/tha3/app/postprocessor.py`.
"""
global target_fps
if settings is None:
settings = {}
logger.info(f"load_animator_settings: user-provided settings: {settings}")
# Load server-side settings (`talkinghead/animator.json`)
try:
animator_config_path = os.path.join(talkinghead_basedir, "animator.json")
with open(animator_config_path, "r") as json_file:
server_settings = json.load(json_file)
except Exception as exc:
logger.info(f"load_animator_settings: skipping server settings, reason: {exc}")
server_settings = {}
# Let's define some helpers:
def drop_unrecognized(settings: Dict[str, Any], context: str) -> None: # DANGER: MUTATING FUNCTION
unknown_fields = [field for field in settings if field not in animator_defaults]
if unknown_fields:
logger.warning(f"load_animator_settings: in {context}: this server did not recognize the following settings, ignoring them: {unknown_fields}")
for field in unknown_fields:
settings.pop(field)
assert all(field in animator_defaults for field in settings) # contract: only known settings remaining
def typecheck(settings: Dict[str, Any], context: str) -> None: # DANGER: MUTATING FUNCTION
for field, default_value in animator_defaults.items():
type_match = (int, float) if isinstance(default_value, (int, float)) else type(default_value)
if field in settings and not isinstance(settings[field], type_match):
logger.warning(f"load_animator_settings: in {context}: incorrect type for '{field}': got {type(settings[field])} with value '{settings[field]}', expected {type_match}")
settings.pop(field) # (safe; this is not the collection we are iterating over)
def aggregate(settings: Dict[str, Any], fallback_settings: Dict[str, Any], fallback_context: str) -> None: # DANGER: MUTATING FUNCTION
for field, default_value in fallback_settings.items():
if field not in settings:
logger.info(f"load_animator_settings: filling in '{field}' from {fallback_context}")
settings[field] = default_value
# Now our settings loading strategy is as simple as:
settings = dict(settings) # copy to avoid modifying the original, since we'll pop some stuff.
if settings:
drop_unrecognized(settings, context="user settings")
typecheck(settings, context="user settings")
if server_settings:
drop_unrecognized(server_settings, context="server settings")
typecheck(server_settings, context="server settings")
# both `settings` and `server_settings` are fully valid at this point
aggregate(settings, fallback_settings=server_settings, fallback_context="server settings") # first fill in from server-side settings
aggregate(settings, fallback_settings=animator_defaults, fallback_context="built-in defaults") # then fill in from hardcoded defaults
logger.info(f"load_animator_settings: final settings (filled in as necessary): {settings}")
# Some settings must be applied explicitly.
logger.debug(f"load_animator_settings: Setting new target FPS = {settings['target_fps']}")
target_fps = settings.pop("target_fps") # global variable, controls the network send rate.
logger.debug("load_animator_settings: Sending new effect chain to postprocessor")
self.postprocessor.chain = settings.pop("postprocessor_chain") # ...and that's where the postprocessor reads its filter settings from.
# The rest of the settings we can just store in an attribute, and let the animation drivers read them from there.
self._settings = settings
def load_image(self, file_path=None) -> None:
"""Load the image file at `file_path`, and replace the current character with it.
Except, if `global_reload_image is not None`, use the global reload image data instead.
In that case `file_path` is not used.
When done, this always sets `global_reload_image` to `None`.
"""
global global_reload_image
try:
if global_reload_image is not None:
pil_image = global_reload_image
else:
pil_image = resize_PIL_image(
extract_PIL_image_from_filelike(file_path),
(self.poser.get_image_size(), self.poser.get_image_size()))
w, h = pil_image.size
if pil_image.size != (512, 512):
logger.info("Resizing Char Card to work")
pil_image = to_talkinghead_image(pil_image)
w, h = pil_image.size
if pil_image.mode != "RGBA":
logger.error("load_image: image must have alpha channel")
self.source_image = None
else:
self.source_image = extract_pytorch_image_from_PIL_image(pil_image) \
.to(self.device).to(self.poser.get_dtype())
except Exception as exc:
logger.error(f"load_image: {exc}")
finally:
global_reload_image = None
# --------------------------------------------------------------------------------
# Animation drivers
def apply_emotion_to_pose(self, emotion_posedict: Dict[str, float], pose: List[float]) -> List[float]:
"""Copy all morphs except breathing from `emotion_posedict` to `pose`.
If a morph does not exist in `emotion_posedict`, its value is copied from the original `pose`.
Return the modified pose.
"""
new_pose = list(pose) # copy
for idx, key in enumerate(posedict_keys):
if key in emotion_posedict and key != "breathing_index":
new_pose[idx] = emotion_posedict[key]
return new_pose
def animate_blinking(self, pose: List[float]) -> List[float]:
"""Eye blinking animation driver.
Relevant `self._settings` keys:
`"blink_interval_min"`: float, seconds, lower limit for random minimum time until next blink is allowed.
`"blink_interval_max"`: float, seconds, upper limit for random minimum time until next blink is allowed.
`"blink_probability"`: float, at each frame at a reference of 25 FPS. FPS-corrected automatically.
`"blink_confusion_duration"`: float, seconds, upon entering "confusion" emotion, during which blinking
quickly in succession is allowed.
Return the modified pose.
"""
# Compute FPS-corrected blink probability
CALIBRATION_FPS = 25
p_orig = self._settings["blink_probability"] # blink probability per frame at CALIBRATION_FPS
avg_render_sec = self.render_duration_statistics.average()
if avg_render_sec > 0:
avg_render_fps = 1 / avg_render_sec
# Even if render completes faster, the `talkinghead` output is rate-limited to `target_fps` at most.
avg_render_fps = min(avg_render_fps, target_fps)
else: # No statistics available yet; let's assume we're running at `target_fps`.
avg_render_fps = target_fps
# We give an independent trial for each of `n` "normalized frames" elapsed at `CALIBRATION_FPS` during one actual frame at `avg_render_fps`.
# Note direction: rendering faster (higher FPS) means less likely to blink per frame, to obtain the same blink density per unit of wall time.
n = CALIBRATION_FPS / avg_render_fps
# If at least one of the normalized frames wants to blink, then the actual frame should blink.
# Doesn't matter that `n` isn't an integer, since the power function over the reals is continuous and we just want a reasonable scaling here.
p_scaled = 1.0 - (1.0 - p_orig)**n
should_blink = (random.random() <= p_scaled)
debug_fps = round(avg_render_fps, 1)
logger.debug(f"animate_blinking: p @ {CALIBRATION_FPS} FPS = {p_orig}, scaled p @ {debug_fps:.1f} FPS = {p_scaled:0.6g}")
# Prevent blinking too fast in succession.
time_now = time.time_ns()
if self.blink_interval is not None:
# ...except when the "confusion" emotion has been entered recently.
seconds_since_last_emotion_change = (time_now - self.last_emotion_change_timestamp) / 10**9
if current_emotion == "confusion" and seconds_since_last_emotion_change < self._settings["blink_confusion_duration"]:
pass
else:
seconds_since_last_blink = (time_now - self.last_blink_timestamp) / 10**9
if seconds_since_last_blink < self.blink_interval:
should_blink = False
if not should_blink:
return pose
# If there should be a blink, set the wink morphs to 1.
new_pose = list(pose) # copy
for morph_name in ["eye_wink_left_index", "eye_wink_right_index"]:
idx = posedict_key_to_index[morph_name]
new_pose[idx] = 1.0
# Typical for humans is 12...20 times per minute, i.e. 5...3 seconds interval.
self.last_blink_timestamp = time_now
self.blink_interval = random.uniform(self._settings["blink_interval_min"],
self._settings["blink_interval_max"]) # seconds; duration of this blink before the next one can begin
return new_pose
def animate_talking(self, pose: List[float], target_pose: List[float]) -> List[float]:
"""Talking animation driver.
Relevant `self._settings` keys:
`"talking_fps"`: float, how often to re-randomize mouth during talking animation.
Early 2000s anime used ~12 FPS as the fastest actual framerate of
new cels (not counting camera panning effects and such).
`"talking_morph"`: str, see `posedict_keys` for available values.
Which morph to use for opening and closing the mouth during talking.
Any other morphs in the mouth-open group are set to zero while
talking is in progress.
Works by randomizing the mouth-open state in regular intervals.
When talking ends, the mouth immediately snaps to its position in the target pose
(to avoid a slow, unnatural closing, since most expressions have the mouth closed).
Return the modified pose.
"""
MOUTH_OPEN_MORPHS = ["mouth_aaa_index", "mouth_iii_index", "mouth_uuu_index", "mouth_eee_index", "mouth_ooo_index", "mouth_delta"]
talking_morph = self._settings["talking_morph"]
if not is_talking:
try:
if self.was_talking: # when talking ends, snap mouth to target immediately
new_pose = list(pose) # copy
for key in MOUTH_OPEN_MORPHS:
idx = posedict_key_to_index[key]
new_pose[idx] = target_pose[idx]
return new_pose
return pose # most common case: do nothing (not talking, and wasn't talking during previous frame)
finally: # reset state *after* processing
self.last_talking_target_value = None
self.last_talking_timestamp = None
self.was_talking = False
assert is_talking
# With 25 FPS (or faster) output, randomizing the mouth every frame looks too fast.
# Determine whether enough wall time has passed to randomize a new mouth position.
TARGET_SEC = 1 / self._settings["talking_fps"] # rate of "actual new cels" in talking animation
time_now = time.time_ns()
update_mouth = False
if self.last_talking_timestamp is None:
update_mouth = True
else:
time_elapsed_sec = (time_now - self.last_talking_timestamp) / 10**9
if time_elapsed_sec >= TARGET_SEC:
update_mouth = True
# Apply the mouth open morph
new_pose = list(pose) # copy
idx = posedict_key_to_index[talking_morph]
if self.last_talking_target_value is None or update_mouth:
# Randomize new mouth position
x = pose[idx]
x = abs(1.0 - x) + random.uniform(-2.0, 2.0)
x = max(0.0, min(x, 1.0)) # clamp (not the manga studio)
self.last_talking_target_value = x
self.last_talking_timestamp = time_now
else:
# Keep the mouth at its latest randomized position (this overrides the interpolator that would pull the mouth toward the target emotion pose)
x = self.last_talking_target_value
new_pose[idx] = x
# Zero out other morphs that affect mouth open/closed state.
for key in MOUTH_OPEN_MORPHS:
if key == talking_morph:
continue
idx = posedict_key_to_index[key]
new_pose[idx] = 0.0
self.was_talking = True
return new_pose
def compute_sway_target_pose(self, original_target_pose: List[float]) -> List[float]:
"""History-free sway animation driver.
`original_target_pose`: emotion pose to modify with a randomized sway target
Relevant `self._settings` keys:
`"sway_morphs"`: List[str], which morphs can sway. By default, this is all geometric transformations,
but disabling some can be useful for some characters (such as robots).
For available values, see `posedict_keys`.
`"sway_interval_min"`: float, seconds, lower limit for random time interval until randomizing new sway pose.
`"sway_interval_max"`: float, seconds, upper limit for random time interval until randomizing new sway pose.
Note the limits are ignored when `original_target_pose` changes (then immediately refreshing
the sway pose), because an emotion pose may affect the geometric transformations, too.
`"sway_macro_strength"`: float, [0, 1]. In sway pose, max abs deviation from emotion pose target morph value
for each sway morph, but also max deviation from center. The `original_target_pose`
itself may use higher values; in such cases, sway will only occur toward the center.
See the source code of this function for the exact details.
`"sway_micro_strength"`: float, [0, 1]. Max abs random noise to sway target pose, added each frame, to make
the animation look less robotic. No limiting other than a clamp of final pose to [-1, 1].
The sway target pose is randomized again when necessary; this takes care of caching internally.
Return the modified pose.
"""
# We just modify the target pose, and let the ODE integrator (`interpolate_pose`) do the actual animation.
# - This way we don't need to track start state, progress, etc.
# - This also makes the animation nonlinear automatically: a saturating exponential trajectory toward the target.
# - If we want a smooth start toward a target pose/morph, we can e.g. save the timestamp when the animation began, and then ramp the rate of change,
# beginning at zero and (some time later, as measured from the timestamp) ending at the original, non-ramped value. The ODE itself takes care of
# slowing down when we approach the target state.
# As documented in the original THA tech reports, on the pose axes, zero is centered, and 1.0 = 15 degrees.
random_max = self._settings["sway_macro_strength"] # max sway magnitude from center position of each morph
noise_max = self._settings["sway_micro_strength"] # amount of dynamic noise (re-generated every frame), added on top of the sway target, no clamping except to [-1, 1]
SWAYPARTS = self._settings["sway_morphs"] # some characters might not sway on all axes (e.g. a robot)
def macrosway() -> List[float]: # this handles caching and everything
time_now = time.time_ns()
should_pick_new_sway_target = True
if current_emotion == self.last_emotion:
if self.sway_interval is not None: # have we created a swayed pose at least once?
seconds_since_last_sway_target = (time_now - self.last_sway_target_timestamp) / 10**9
if seconds_since_last_sway_target < self.sway_interval:
should_pick_new_sway_target = False
# else, emotion has changed, invalidating the old sway target, because it is based on the old emotion (since emotions may affect the pose too).
if not should_pick_new_sway_target:
if self.last_sway_target_pose is not None: # When keeping the same sway target, return the cached sway pose if we have one.
return self.last_sway_target_pose
else: # Should not happen, but let's be robust.
return original_target_pose
new_target_pose = list(original_target_pose) # copy
for key in SWAYPARTS:
idx = posedict_key_to_index[key]
target_value = original_target_pose[idx]
# Determine the random range so that the swayed target always stays within `[-random_max, random_max]`, regardless of `target_value`.
# TODO: This is a simple zeroth-order solution that just cuts the random range.
# Would be nicer to *gradually* decrease the available random range on the "outside" as the target value gets further from the origin.
random_upper = max(0, random_max - target_value) # e.g. if target_value = 0.2, then random_upper = 0.4 => max possible = 0.6 = random_max
random_lower = min(0, -random_max - target_value) # e.g. if target_value = -0.2, then random_lower = -0.4 => min possible = -0.6 = -random_max
random_value = random.uniform(random_lower, random_upper)
new_target_pose[idx] = target_value + random_value
self.last_sway_target_pose = new_target_pose
self.last_sway_target_timestamp = time_now
self.sway_interval = random.uniform(self._settings["sway_interval_min"],
self._settings["sway_interval_max"]) # seconds; duration of this sway target before randomizing new one
return new_target_pose
# Add dynamic noise (re-generated at 25 FPS) to the target to make the animation look less robotic, especially once we are near the target pose.
def add_microsway() -> None: # DANGER: MUTATING FUNCTION
CALIBRATION_FPS = 25 # FPS at which randomizing a new microsway target looks good
time_now = time.time_ns()
should_microsway = True
if self.last_microsway_timestamp is not None:
seconds_since_last_microsway = (time_now - self.last_microsway_timestamp) / 10**9
if seconds_since_last_microsway < 1 / CALIBRATION_FPS:
should_microsway = False
if should_microsway:
for key in SWAYPARTS:
idx = posedict_key_to_index[key]
x = new_target_pose[idx] + random.uniform(-noise_max, noise_max)
x = max(-1.0, min(x, 1.0))
new_target_pose[idx] = x
self.last_microsway_timestamp = time_now
new_target_pose = macrosway()
add_microsway()
return new_target_pose
def animate_breathing(self, pose: List[float]) -> List[float]:
"""Breathing animation driver.
Relevant `self._settings` keys:
`"breathing_cycle_duration"`: seconds. Duration of one full breathing cycle.
Return the modified pose.
"""
breathing_cycle_duration = self._settings["breathing_cycle_duration"] # seconds
time_now = time.time_ns()
t = (time_now - self.breathing_epoch) / 10**9 # seconds since breathing-epoch
cycle_pos = t / breathing_cycle_duration # number of cycles since breathing-epoch
if cycle_pos > 1.0: # prevent loss of accuracy in long sessions
self.breathing_epoch = time_now # TODO: be more accurate here, should sync to a whole cycle
cycle_pos = cycle_pos - float(int(cycle_pos)) # fractional part
new_pose = list(pose) # copy
idx = posedict_key_to_index["breathing_index"]
new_pose[idx] = math.sin(cycle_pos * math.pi)**2 # 0 ... 1 ... 0, smoothly, with slow start and end, fast middle
return new_pose
def interpolate_pose(self, pose: List[float], target_pose: List[float]) -> List[float]:
"""Interpolate from current `pose` toward `target_pose`.
Relevant `self._settings` keys:
`"pose_interpolator_step"`: [0, 1]; how far toward `target_pose` to interpolate in one frame,
assuming a reference of 25 FPS. This is FPS-corrected automatically.
0 is fully `pose`, 1 is fully `target_pose`.
This is a kind of history-free rate-based formulation, which needs only the current and target poses, and
the step size; there is no need to keep track of e.g. the initial pose or the progress along the trajectory.
Note that looping back the output as `pose`, while keeping `target_pose` constant, causes the current pose
to approach `target_pose` on a saturating trajectory. This is because `step` is the fraction of the *current*
difference between `pose` and `target_pose`, which obviously becomes smaller after each repeat.
This is a feature, not a bug!
"""
# The `step` parameter is calibrated against animation at 25 FPS, so we must scale it appropriately, taking
# into account the actual FPS.
#
# How to do this requires some explanation. Numericist hat on. Let's do a quick back-of-the-envelope calculation.
# This pose interpolator is essentially a solver for the first-order ODE:
#
# u' = f(u, t)
#
# Consider the most common case, where the target pose remains constant over several animation frames.
# Furthermore, consider just one morph (they all behave similarly). Then our ODE is Newton's law of cooling:
#
# u' = -β [u - u∞]
#
# where `u = u(t)` is the temperature, `u∞` is the constant temperature of the external environment,
# and `β > 0` is a material-dependent cooling coefficient.
#
# But instead of numerical simulation at a constant timestep size, as would be typical in computational science,
# we instead read off points off the analytical solution curve. The `step` parameter is *not* the timestep size;
# instead, it controls the relative distance along the *u* axis that should be covered in one simulation step,
# so it is actually related to the cooling coefficient β.
#
# (How exactly: write the left-hand side as `[unew - uold] / Δt + O([Δt]²)`, drop the error term, and decide
# whether to use `uold` (forward Euler) or `unew` (backward Euler) as `u` on the right-hand side. Then compare
# to our update formula. But those details don't matter here.)
#
# To match the notation in the rest of this code, let us denote the temperature (actually pose morph value) as `x`
# (instead of `u`). And to keep notation shorter, let `β := step` (although it's not exactly the `β` of the
# continuous-in-time case above).
#
# To scale the animation speed linearly with regard to FPS, we must invert the relation between simulation step
# number `n` and the solution value `x`. For an initial value `x0`, a constant target value `x∞`, and constant
# step `β ∈ (0, 1]`, the pose interpolator produces the sequence:
#
# x1 = x0 + β [x∞ - x0] = [1 - β] x0 + β x∞
# x2 = x1 + β [x∞ - x1] = [1 - β] x1 + β x∞
# x3 = x2 + β [x∞ - x2] = [1 - β] x2 + β x∞
# ...
#
# Note that with exact arithmetic, if `β < 1`, the final value is only reached in the limit `n → ∞`.
# For floating point, this is not the case. Eventually the increment becomes small enough that when
# it is added, nothing happens. After sufficiently many steps, in practice `x` will stop just slightly
# short of `x∞` (on the side it approached the target from).
#
# (For performance reasons, when approaching zero, one may need to beware of denormals, because those
# are usually implemented in (slow!) software on modern CPUs. So especially if the target is zero,
# it is useful to have some very small cutoff (inside the normal floating-point range) after which
# we make `x` instantly jump to the target value.)
#
# Inserting the definition of `x1` to the formula for `x2`, we can express `x2` in terms of `x0` and `x∞`:
#
# x2 = [1 - β] ([1 - β] x0 + β x∞) + β x∞
# = [1 - β]² x0 + [1 - β] β x∞ + β x∞
# = [1 - β]² x0 + [[1 - β] + 1] β x∞
#
# Then inserting this to the formula for `x3`:
#
# x3 = [1 - β] ([1 - β]² x0 + [[1 - β] + 1] β x∞) + β x∞
# = [1 - β]³ x0 + [1 - β]² β x∞ + [1 - β] β x∞ + β x∞
#
# To simplify notation, define:
#
# α := 1 - β
#
# We have:
#
# x1 = α x0 + [1 - α] x∞
# x2 = α² x0 + [1 - α] [1 + α] x∞
# = α² x0 + [1 - α²] x∞
# x3 = α³ x0 + [1 - α] [1 + α + α²] x∞
# = α³ x0 + [1 - α³] x∞
#
# This suggests that the general pattern is (as can be proven by induction on `n`):
#
# xn = α**n x0 + [1 - α**n] x∞
#
# This allows us to determine `x` as a function of simulation step number `n`. Now the scaling question becomes:
# if we want to reach a given value `xn` by some given step `n_scaled` (instead of the original step `n`),
# how must we change the step size `β` (or equivalently, the parameter `α`)?
#
# To simplify further, observe:
#
# x1 = α x0 + [1 - α] [[x∞ - x0] + x0]
# = [α + [1 - α]] x0 + [1 - α] [x∞ - x0]
# = x0 + [1 - α] [x∞ - x0]
#
# Rearranging yields:
#
# [x1 - x0] / [x∞ - x0] = 1 - α
#
# which gives us the relative distance from `x0` to `x∞` that is covered in one step. This isn't yet much
# to write home about (it's essentially just a rearrangement of the definition of `x1`), but next, let's
# treat `x2` the same way:
#
# x2 = α² x0 + [1 - α] [1 + α] [[x∞ - x0] + x0]
# = [α² x0 + [1 - α²] x0] + [1 - α²] [x∞ - x0]
# = [α² + 1 - α²] x0 + [1 - α²] [x∞ - x0]
# = x0 + [1 - α²] [x∞ - x0]
#
# We obtain
#
# [x2 - x0] / [x∞ - x0] = 1 - α²
#
# which is the relative distance, from the original `x0` toward the final `x∞`, that is covered in two steps
# using the original step size `β = 1 - α`. Next up, `x3`:
#
# x3 = α³ x0 + [1 - α³] [[x∞ - x0] + x0]
# = α³ x0 + [1 - α³] [x∞ - x0] + [1 - α³] x0
# = x0 + [1 - α³] [x∞ - x0]
#
# Rearranging,
#
# [x3 - x0] / [x∞ - x0] = 1 - α³
#
# which is the relative distance covered in three steps. Hence, we have:
#
# xrel := [xn - x0] / [x∞ - x0] = 1 - α**n
#
# so that
#
# α**n = 1 - xrel (**)
#
# and (taking the natural logarithm of both sides)
#
# n log α = log [1 - xrel]
#
# Finally,
#
# n = [log [1 - xrel]] / [log α]
#
# Given `α`, this gives the `n` where the interpolator has covered the fraction `xrel` of the original distance.
# On the other hand, we can also solve (**) for `α`:
#
# α = (1 - xrel)**(1 / n)
#
# which, given desired `n`, gives us the `α` that makes the interpolator cover the fraction `xrel` of the original distance in `n` steps.
#
CALIBRATION_FPS = 25 # FPS for which the default value `step` was calibrated
xrel = 0.5 # just some convenient value
step = self._settings["pose_interpolator_step"]
alpha_orig = 1.0 - step
if 0 < alpha_orig < 1:
avg_render_sec = self.render_duration_statistics.average()
if avg_render_sec > 0:
avg_render_fps = 1 / avg_render_sec
# Even if render completes faster, the `talkinghead` output is rate-limited to `target_fps` at most.
avg_render_fps = min(avg_render_fps, target_fps)
else: # No statistics available yet; let's assume we're running at `target_fps`.
avg_render_fps = target_fps
# For a constant target pose and original `α`, compute the number of animation frames to cover `xrel` of distance from initial pose to final pose.
n_orig = math.log(1.0 - xrel) / math.log(alpha_orig)
# Compute the scaled `n`. Note the direction: we need a smaller `n` (fewer animation frames) if the render runs slower than the calibration FPS.
n_scaled = (avg_render_fps / CALIBRATION_FPS) * n_orig
# Then compute the `α` that reaches `xrel` distance in `n_scaled` animation frames.
alpha_scaled = (1.0 - xrel)**(1 / n_scaled)
else: # avoid some divisions by zero at the extremes
alpha_scaled = alpha_orig
step_scaled = 1.0 - alpha_scaled
debug_fps = round(avg_render_fps, 1)
logger.debug(f"interpolate_pose: step @ {CALIBRATION_FPS} FPS = {step}, scaled step @ {debug_fps:.1f} FPS = {step_scaled:0.6g}")
# NOTE: This overwrites blinking, talking, and breathing, but that doesn't matter, because we apply this first.
# The other animation drivers then modify our result.
EPSILON = 1e-8
new_pose = list(pose) # copy
for idx, key in enumerate(posedict_keys):
# # We now animate blinking *after* interpolating the pose, so when blinking, the eyes close instantly.
# # This modification would make the blink also end instantly.
# if key in ["eye_wink_left_index", "eye_wink_right_index"]:
# new_pose[idx] = target_pose[idx]
# else:
# ...
delta = target_pose[idx] - pose[idx]
new_pose[idx] = pose[idx] + step_scaled * delta
# Prevent denormal floats (which are really slow); important when running on CPU and approaching zero.
# Our ϵ is really big compared to denormals; but there's no point in continuing to compute ever smaller
# differences in the animated value when it has already almost (and visually, completely) reached the target.
if abs(new_pose[idx] - target_pose[idx]) < EPSILON:
new_pose[idx] = target_pose[idx]
return new_pose
# --------------------------------------------------------------------------------
# Animation logic
def render_animation_frame(self) -> None:
"""Render an animation frame.
If the previous rendered frame has not been retrieved yet, do nothing.
"""
if not animation_running:
return
# If no one has retrieved the latest rendered frame yet, do not render a new one.
if self.new_frame_available:
return
if global_reload_image is not None:
self.load_image()
if self.source_image is None:
return
time_render_start = time.time_ns()
if self.current_pose is None: # initialize character pose at plugin startup
self.current_pose = posedict_to_pose(self.emotions[current_emotion])
emotion_posedict = self.emotions[current_emotion]
if current_emotion != self.last_emotion: # some animation drivers need to know when the emotion last changed
self.last_emotion_change_timestamp = time_render_start
target_pose = self.apply_emotion_to_pose(emotion_posedict, self.current_pose)
target_pose = self.compute_sway_target_pose(target_pose)
self.current_pose = self.interpolate_pose(self.current_pose, target_pose)
self.current_pose = self.animate_blinking(self.current_pose)
self.current_pose = self.animate_talking(self.current_pose, target_pose)
self.current_pose = self.animate_breathing(self.current_pose)
# Update this last so that animation drivers have access to the old emotion, too.
self.last_emotion = current_emotion
pose = torch.tensor(self.current_pose, device=self.device, dtype=self.poser.get_dtype())
with torch.no_grad():
# - [0]: model's output index for the full result image
# - model's data range is [-1, +1], linear intensity ("gamma encoded")
output_image = self.poser.pose(self.source_image, pose)[0].float()
# A simple crop filter, for removing empty space around character.
# Apply this first so that the postprocessor has fewer pixels to process.
c, h, w = output_image.shape
x1 = int((self._settings["crop_left"] / 2.0) * w)
x2 = int((1 - (self._settings["crop_right"] / 2.0)) * w)
y1 = int((self._settings["crop_top"] / 2.0) * h)
y2 = int((1 - (self._settings["crop_bottom"] / 2.0)) * h)
output_image = output_image[:, y1:y2, x1:x2]
# [-1, 1] -> [0, 1]
# output_image = (output_image + 1.0) / 2.0
output_image.add_(1.0)
output_image.mul_(0.5)
self.postprocessor.render_into(output_image) # apply pixel-space glitch artistry
output_image = convert_linear_to_srgb(output_image) # apply gamma correction
# convert [c, h, w] float -> [h, w, c] uint8
c, h, w = output_image.shape
output_image = torch.transpose(output_image.reshape(c, h * w), 0, 1).reshape(h, w, c)
output_image = (255.0 * output_image).byte()
output_image_numpy = output_image.detach().cpu().numpy()
# Update FPS counter, measuring animation frame render time only.
#
# This says how fast the renderer *can* run on the current hardware;
# note we don't actually render more frames than the client consumes.
time_now = time.time_ns()
if self.source_image is not None:
render_elapsed_sec = (time_now - time_render_start) / 10**9
self.render_duration_statistics.add_datapoint(render_elapsed_sec)
# Set the new rendered frame as the output image, and mark the frame as ready for consumption.
with _animator_output_lock:
self.result_image = output_image_numpy # atomic replace
self.new_frame_available = True
# Log the FPS counter in 5-second intervals.
if animation_running and (self.last_report_time is None or time_now - self.last_report_time > 5e9):
avg_render_sec = self.render_duration_statistics.average()
msec = round(1000 * avg_render_sec, 1)
fps = round(1 / avg_render_sec, 1) if avg_render_sec > 0.0 else 0.0
logger.info(f"render: {msec:.1f}ms [{fps} FPS available]")
self.last_report_time = time_now
class Encoder:
"""Network transport encoder.
We read each frame from the animator as it becomes ready, and keep it available in `self.image_bytes`
until the next frame arrives. The `self.image_bytes` buffer is replaced atomically, so this needs no lock
(you always get the latest available frame at the time you access `image_bytes`).
"""
def __init__(self) -> None:
self.image_bytes = None
self.encoder_thread = None
def start(self) -> None:
"""Start the output encoder thread."""
self._terminated = False
def encoder_update():
last_report_time = None
encode_duration_statistics = RunningAverage()
wait_duration_statistics = RunningAverage()
while not self._terminated:
# Retrieve a new frame from the animator if available.
have_new_frame = False
time_encode_start = time.time_ns()
with _animator_output_lock:
if global_animator_instance.new_frame_available:
image_rgba = global_animator_instance.result_image
global_animator_instance.new_frame_available = False # animation frame consumed; start rendering the next one
have_new_frame = True # This flag is needed so we can release the animator lock as early as possible.
# If a new frame arrived, pack it for sending (only once for each new frame).
if have_new_frame:
try:
pil_image = PIL.Image.fromarray(np.uint8(image_rgba[:, :, :3]))
if image_rgba.shape[2] == 4:
alpha_channel = image_rgba[:, :, 3]
pil_image.putalpha(PIL.Image.fromarray(np.uint8(alpha_channel)))
# Save as PNG with RGBA mode. Use the fastest compression level available.
#
# On an i7-12700H @ 2.3 GHz (laptop optimized for low fan noise):
# - `compress_level=1` (fastest), about 20 ms
# - `compress_level=6` (default), about 40 ms (!) - too slow!
# - `compress_level=9` (smallest size), about 120 ms
#
# time_now = time.time_ns()
buffer = io.BytesIO()
pil_image.save(buffer, format="PNG", compress_level=1)
image_bytes = buffer.getvalue()
# pack_duration_sec = (time.time_ns() - time_now) / 10**9
# We now have a new encoded frame; but first, sync with network send.
# This prevents from rendering/encoding more frames than are actually sent.
previous_frame = self.image_bytes
if previous_frame is not None:
time_wait_start = time.time_ns()
# Wait in 1ms increments until the previous encoded frame has been sent
while global_latest_frame_sent != id(previous_frame) and not self._terminated:
time.sleep(0.001)
time_now = time.time_ns()
wait_elapsed_sec = (time_now - time_wait_start) / 10**9
else:
wait_elapsed_sec = 0.0
self.image_bytes = image_bytes # atomic replace so no need for a lock
except Exception as exc:
logger.error(exc)
raise # let the encoder stop so we won't spam the log
# Update FPS counter.
time_now = time.time_ns()
walltime_elapsed_sec = (time_now - time_encode_start) / 10**9
encode_elapsed_sec = walltime_elapsed_sec - wait_elapsed_sec
encode_duration_statistics.add_datapoint(encode_elapsed_sec)
wait_duration_statistics.add_datapoint(wait_elapsed_sec)
# Log the FPS counter in 5-second intervals.
time_now = time.time_ns()
if animation_running and (last_report_time is None or time_now - last_report_time > 5e9):
avg_encode_sec = encode_duration_statistics.average()
msec = round(1000 * avg_encode_sec, 1)
avg_wait_sec = wait_duration_statistics.average()
wait_msec = round(1000 * avg_wait_sec, 1)
fps = round(1 / avg_encode_sec, 1) if avg_encode_sec > 0.0 else 0.0
logger.info(f"encode: {msec:.1f}ms [{fps} FPS available]; send sync wait {wait_msec:.1f}ms")
last_report_time = time_now
time.sleep(0.01) # rate-limit the encoder to 100 FPS maximum (this could be adjusted later)
self.encoder_thread = threading.Thread(target=encoder_update, daemon=True)
self.encoder_thread.start()
atexit.register(self.exit)
def exit(self) -> None:
"""Terminate the output encoder thread.
Called automatically when the process exits.
"""
self._terminated = True
self.encoder_thread.join()
self.encoder_thread = None