import json import sys import re import spacy # Load the language model nlp = spacy.load("en_core_web_lg") import sys from time import sleep, time import logging from collections import defaultdict import pandas as pd from flask import redirect import argparse import base64 import pandas as pd import openai, numpy as np openai.api_key = "sk-proj-GaouEG2QuAcfAr1an2uBT3BlbkFJaIh0XVFXWrYQpJazlbeO" # from openai.embeddings_utils import get_embedding, cosine_similarity from flask import send_file, Response, request, jsonify from flask_socketio import emit from piedemo.fields.ajax_group import AjaxChatField, AjaxGroup from piedemo.fields.grid import VStack, HStack, SpaceField from piedemo.fields.inputs.hidden import InputHiddenField from piedemo.fields.outputs.colored_text import ptext, OutputColoredTextField from piedemo.fields.outputs.json import OutputJSONField from piedemo.fields.outputs.progress import ProgressField from piedemo.fields.outputs.video import OutputVideoField from piedemo.hub.swagger_utils.method import describe, check_missing_keys from piedemo.web import Web import os import io from piedemo.page import Page from piedemo.hub.svgpil import SVGImage from piedemo.fields.outputs.table import OutputTableField from piedemo.fields.inputs.int_list import InputIntListField from piedemo.fields.navigation import Navigation from piedemo.fields.inputs.chat import ChatField import librosa import uuid import numpy as np import redis import argparse from scipy.signal import savgol_filter import torch import random import os, subprocess import shlex import uuid from tqdm import tqdm from aihandler import AIHandler from aihandler_stream import AIHandlerStream from aihandler_new import AIHandlerNewStream from pieinfer import PieInfer, render_video, construct_video import torch from TTS.api import TTS sys.path.append("/home/ubuntu/repo/slavaBIM") from pages.demo import DemoPage logging.getLogger('socketio').setLevel(logging.ERROR) logging.getLogger('engineio').setLevel(logging.ERROR) target_names = [ "mouthSmileLeft", "mouthSmileRight", "mouthStretchLeft", "mouthStretchRight", "mouthUpperUpLeft", "mouthUpperUpRight", ] model_bsList = ["browDownLeft", "browDownRight", "browInnerUp", "browOuterUpLeft", "browOuterUpRight", "cheekPuff", "cheekSquintLeft", "cheekSquintRight", "eyeBlinkLeft", "eyeBlinkRight", "eyeLookDownLeft", "eyeLookDownRight", "eyeLookInLeft", "eyeLookInRight", "eyeLookOutLeft", "eyeLookOutRight", "eyeLookUpLeft", "eyeLookUpRight", "eyeSquintLeft", "eyeSquintRight", "eyeWideLeft", "eyeWideRight", "jawForward", "jawLeft", "jawOpen", "jawRight", "mouthClose", "mouthDimpleLeft", "mouthDimpleRight", "mouthFrownLeft", "mouthFrownRight", "mouthFunnel", "mouthLeft", "mouthLowerDownLeft", "mouthLowerDownRight", "mouthPressLeft", "mouthPressRight", "mouthPucker", "mouthRight", "mouthRollLower", "mouthRollUpper", "mouthShrugLower", "mouthShrugUpper", "mouthSmileLeft", "mouthSmileRight", "mouthStretchLeft", "mouthStretchRight", "mouthUpperUpLeft", "mouthUpperUpRight", "noseSneerLeft", "noseSneerRight", "tongueOut"] # Get device device = "cuda" if torch.cuda.is_available() else "cpu" blendshapes_path = "./blendshapes" def get_asset(fname): return SVGImage.open(os.path.join(os.path.dirname(__file__), "assets", fname)).svg_content class MainPage(Page): def __init__(self, model_name: str): super(MainPage, self).__init__() self.infer = PieInfer() self.tts = TTS("tts_models/multilingual/multi-dataset/xtts_v2").to(device) self.r = redis.Redis(host='localhost', port=6379, decode_responses=True) self.aihandler = AIHandler() self.aihandler_stream = AIHandlerStream() self.aihandler_new_stream = AIHandlerNewStream() self.fields = Navigation(AjaxGroup("ChatGroup", VStack([ HStack([ AjaxChatField("Chat", self.register_ajax(f"/refresh_{model_name}", self.message_sent), deps_names=["sid", "session_id", "Chat", "Chat__piedemo__file"], use_socketio_support=True, nopie=True, style={ "height": "100%" }), OutputColoredTextField("video", nopie=True, use_socketio_support=True), ], xs=[8, 4]), ProgressField("progress", nopie=True, use_socketio_support=True), InputHiddenField("session_id", None), ]), no_return=True), no_submit=True, page_title="MIA PIA", page_style={ }) self.fields.add_link("SIMPLE", "/simple", active=model_name == "render") self.fields.add_link("MIA PIA", "/nice", active=model_name != "render") self.model_name = model_name def get_content(self, **kwargs): fields = self.fields.copy() fields.child_loc["Chat"].set_default_options(["Hello! What is your name?", "Say one word and stop."]) """ fields.child_loc["Chat"].set_avatars({ "self": get_asset("avatar.svg"), "ChatGPT": get_asset("dog.svg"), }) """ session_id = str(uuid.uuid4()) return self.fill(fields, { "video": f""" """, "session_id": session_id, }) def message_sent(self, **data): sid = data['sid'] self.emit(self.fields.child_loc["Chat"].clear_input(), to=sid) self.emit(self.fields.child_loc["video"].update(f""" """)) data = self.parse(self.fields, data) session_id = data['session_id'] messages_map = self.r.hgetall(f'user-session:{session_id}') messages = [self.fields.child_loc["Chat"].format_message("self" if i % 2 == 0 else "ChatGPT", messages_map[f"message_{i}"]) for i in range(len(messages_map))] print("history: ", messages) text = data['Chat']['text'] self.emit(self.fields.child_loc["Chat"].update(messages + [ self.fields.child_loc["Chat"].format_message("self", text), self.fields.child_loc["Chat"].format_message("ChatGPT", "Generating text..."), ]), to=sid) output = self.aihandler(text) output_text = output['text'] output_emotion = output['emotion'] messages_map[f"message_{len(messages)}"] = text messages_map[f"message_{len(messages) + 1}"] = output_text self.r.hset(f'user-session:{session_id}', mapping=messages_map) self.emit(self.fields.child_loc["Chat"].update(messages + [ self.fields.child_loc["Chat"].format_message("self", text), self.fields.child_loc["Chat"].format_message("ChatGPT", "Generating audio..."), ]), to=sid) self.tts.tts_to_file(text=output_text, speaker_wav="/home/ubuntu/repo/of_couse_here.wav", language="en", emotion=output_emotion, file_path=f"./audio/{session_id}.wav") speech_array, sampling_rate = librosa.load(f"./audio/{session_id}.wav", sr=16000) output = self.infer(speech_array, sampling_rate) np.save(os.path.join("./audio", "{}.npy".format(session_id)), output) self.emit(self.fields.child_loc["Chat"].update(messages + [ self.fields.child_loc["Chat"].format_message("self", text), self.fields.child_loc["Chat"].format_message("ChatGPT", "Rendering..."), ]), to=sid) n = output.shape[0] for i, fname in enumerate(tqdm(render_video(f"{session_id}", model_name=self.model_name), total=n)): print("Got frame: ", fname, file=sys.stderr) self.emit(self.fields.child_loc["progress"].update(100 * i // n), to=sid) construct_video(session_id) self.emit(self.fields.child_loc["video"].update(f""" """), to=sid) '''self.emit(self.fields.child_loc["video"].update(f""" """))''' self.emit(self.fields.child_loc["Chat"].update(messages + [ self.fields.child_loc["Chat"].format_message("self", text), self.fields.child_loc["Chat"].format_message("ChatGPT", output_text), ]), to=sid) page = MainPage("render") web = Web({ "": "simple", "simple": page, "nice": page, "secret_demo_of_c574d349": DemoPage(), "secret_demo_of_c574dnew": DemoPage(load_new_model=True) }, use_socketio_support=True) web.add_assets_directory("IFC", "/home/ubuntu/repo/slavaBIM/data/IFC") host = '0.0.0.0' port = 8011 debug = False app = web.get_app() @app.route("/api/video/", methods=["GET"]) def get_video(session_id): return send_file("./audio/{}.mp4".format(session_id)) def gen(session_id): for image_path in render_video(f"{session_id}"): with open(image_path, 'rb') as f: yield (b'--frame\r\n' b'Content-Type: image/jpeg\r\n\r\n' + f.read() + b'\r\n') construct_video(session_id) @app.route("/api/video/stream/", methods=["GET"]) def get_video_async(session_id): return Response(gen(session_id), mimetype='multipart/x-mixed-replace; boundary=frame') speaker_path = "/home/ubuntu/repo/female.wav" @app.route("/api/set_speaker", methods=["POST"]) @describe(["3dmodel"], name="Set emotion for 3D model", description="""Set speaker for 3D model""", inputs={ "user_id": "This ID from article Unique Identifier for iPHONE", "speaker": "voice1 or voice2" }, outputs={ "status": "ok" }) @check_missing_keys([ ("user_id", {"status": "error", "status_code": "missing_user_id_error"}), ("speaker", {"status": "error", "status_code": "missing_emotion_error"}), ]) def set_speaker(): speaker = request.json.get("speaker") user_id = request.json.get("user_id") SPEAKER[user_id] = speaker return jsonify({ 'status': 'ok' }) @app.route("/api/set_emotion", methods=["POST"]) @describe(["3dmodel"], name="Set emotion for 3D model", description="""Set emotion for 3D model""", inputs={ "user_id": "This ID from article Unique Identifier for iPHONE", "emotion": "sad" }, outputs={ "status": "ok" }) @check_missing_keys([ ("user_id", {"status": "error", "status_code": "missing_user_id_error"}), ("emotion", {"status": "error", "status_code": "missing_emotion_error"}), ]) def set_emotion(): emotion = request.json.get("emotion") user_id = request.json.get("user_id") EMOTIONS[user_id] = emotion return jsonify({ 'status': 'ok' }) @app.route("/api/get_texts", methods=["POST"]) @describe(["text"], name="Get texts for user_id", description="""This endpoint get all texts for current iPhone""", inputs={ "user_id": "This ID from article Unique Identifier for iPHONE" }, outputs={ "text": "Output", "id": "bot or user", }) @check_missing_keys([ ("user_id", {"status": "error", "status_code": "missing_user_id_error"}), ]) def get_texts(): user_id = request.json.get("user_id") return jsonify(TEXTS[user_id]) @app.route("/api/send_text", methods=["POST"]) @describe(["text"], name="Sent text to miapia", description="""This endpoint sends texts for client""", inputs={ "text": "Hello, MIAPIA", "user_id": "This ID from article Unique Identifier for iPHONE" }, outputs={ "status": "ok" }) @check_missing_keys([ ("text", {"status": "error", "status_code": "missing_text_error"}), ("user_id", {"status": "error", "status_code": "missing_user_id_error"}), ]) def send_text(): user_id = request.json.get("user_id") text = request.json.get("text", "") TEXTS[user_id].append({ "id": 'user', "text": text }) output_texts = page.aihandler_stream(text) bot_text = "" for output_text in output_texts: bot_text += " " + output_text TEXTS[user_id].append({ "id": 'bot', "text": bot_text }) return jsonify({ "status": "ok", "messages": TEXTS[user_id] }) io = web.get_socketio(app, engineio_logger=False) head_memories = {} TEXTS = defaultdict(list) EMOTIONS = {} SPEAKER = {} def get_event(name, value, timestamp): return { "index": model_bsList.index(name), "value": value, "timestamp": timestamp } def get_value(events, name): index = model_bsList.index(name) events = [event for event in events if event['index'] == index] if len(events) == 0: return None return events[-1]['value'] def get_head_memory(): ids = [100, 101, 103, 104, 106, 107, 109, 110] return [[0, 0, 1] for _ in range(len(ids))] def get_head_rotations(alpha, duration, memory, sign): ids = [100, 101, 103, 104, 106, 107, 109, 110] for _ in range(3): index = ids.index(random.choice(ids)) step = 0.01 * sign[index] memory[index][0] += step memory[index][0] = min(memory[index][0], memory[index][2]) memory[index][0] = max(memory[index][0], memory[index][1]) print(memory) return [{ "index": j, "value": memory[i][0], "timestamp": float(duration * alpha) } for i, j in enumerate(ids)], memory def perform_on_text(output_text, sid, head_memory, sign, voice): session_id = str(uuid.uuid4()) page.tts.tts_to_file(text=output_text, speaker_wav="/home/ubuntu/repo/female.wav" if voice == "voice1" else "/home/ubuntu/repo/indian.wav", language="en", emotion="Happy", file_path=f"./audio/{session_id}.wav") audio_path = f"./audio/{session_id}.wav" with open(audio_path, 'rb') as f: audio_content = f.read() encode_string = base64.b64encode(audio_content).decode('utf-8') speech_array, sampling_rate = librosa.load(audio_path, sr=16000) duration = librosa.get_duration(y=speech_array, sr=sampling_rate) output = page.infer(speech_array, sampling_rate) emit("io_push_audio_blob", { "dataURL": f"base64,{encode_string}" }, to=sid) print("Sent audio.") emit("io_set_size", { "size": output.shape[0], }, to=sid) t1 = time() for i in tqdm(range(output.shape[0])): rots, head_memory = get_head_rotations((i / output.shape[0]), duration, head_memory, sign) blendshapes_i = [{ "index": j, "value": output[i, j], "timestamp": float(duration * (i / output.shape[0])) } for j in range(output.shape[1])] + rots if max([get_value(blendshapes_i, target_name) for target_name in target_names]) > 0.5: os.makedirs(blendshapes_path, exist_ok=True) save_blendshapes_i = os.path.join(blendshapes_path, str(uuid.uuid4()) + '.json') with open(save_blendshapes_i, 'w') as f: json.dump(blendshapes_i, f) emit("io_set_coef", blendshapes_i, to=sid) # sleep(0.1 * duration / output.shape[0]) t2 = time() sleep(max(0., duration - (t2 - t1))) return head_memory def perform_surgery(sid, duration=5): with open("../5-seconds-of-silence.wav", 'rb') as f: audio_content = f.read() encode_string = base64.b64encode(audio_content).decode('utf-8') fps = 20 emit("io_push_audio_blob", { "dataURL": f"base64,{encode_string}" }, to=sid) print("Sent audio.") emit("io_set_size", { "size": (fps * duration) }, to=sid) t1 = time() for i in tqdm(range(fps * duration)): alpha = float(i / (fps * duration)) emit("io_set_coef", [ get_event("eyeWideLeft", 0.3 - 0.3 * alpha, float(duration * alpha)), get_event("eyeWideRight", 0.3 - 0.3 * alpha, float(duration * alpha)) ], to=sid) t2 = time() sleep(max(0., duration - (t2 - t1))) BODY_ANIMATION_DESCRIPTIONS = [ "Get on your knees", "Show butt", "Show ass", "Pull back", "Doggy style", "Sit on ass", "Spread legs", "Show pussy", "Missionary position", "Let fuck", "Lie on side", "Show legs", "Rest", "Relax", "Sleep", "Stand up", "Showcase yourself", "Show yourself", "Show your boobs", "Touch your boobs", "Make a blowjob", "On your knees", "Sit on the dildo", "Kiss my ass", "Be on top" ] resp = page.aihandler_stream.ai.embeddings.create(input=BODY_ANIMATION_DESCRIPTIONS, model="text-embedding-3-small") BODY_ANIMATION_EMBEDDINGS = np.array(list(map(lambda x: x.embedding, resp.data))) def to_back_view(sid): emit("io_set_view", [{"type": "back"}], sid=sid) def to_front_view(sid): emit("io_set_view", [{"type": "front"}], sid=sid) def to_up_to_down_view(sid): emit("io_set_view", [{"type": "up_to_down"}], sid=sid) @io.on("io_set_text_body") def io_set_text_body(data): data = json.loads(data) data = data[0] sid = None print(data, file=sys.stderr) if "text" not in data: emit("io_error", {"message": "Text not found"}, to=sid) return text = data["text"] user_id = data.get('user_id') print(user_id) emb = page.aihandler_stream.ai.embeddings.create(input=[text], model="text-embedding-3-small") idx = int(np.argmax(np.sum(np.array([emb.data[0].embedding]) * BODY_ANIMATION_EMBEDDINGS, axis=-1))) animations_durations = { 1: 13, 2: 9.33, 3: 10.5, 4: 7.52, 5: 10, } print("Text: ", text, "\n", "Detected:", BODY_ANIMATION_DESCRIPTIONS[idx], "\n", "Index: ", idx, "\n") # to_body_view(sid) if 1 + idx // 5 == 1: to_back_view(sid) elif 1 + idx // 5 == 3: to_up_to_down_view(sid) else: to_front_view(sid) emit("io_set_body_animation", [{ "index": 1 + idx // 5, "timestamp": 0 }], to=sid) """ start_speaking(text, user_id=user_id, sid=sid) """ sleep(animations_durations[1 + idx // 5] // 2) to_front_view(sid) sleep(animations_durations[1 + idx // 5] // 2) emit("io_finish", {}, to=sid) def start_speaking(text, user_id, sid): TEXTS[user_id].append({ "id": "user", "text": text }) voice = SPEAKER.get(user_id, "voice1") if sid not in head_memories: head_memories[sid] = get_head_memory() head_memory = head_memories[sid] # output_texts = [page.aihandler(text)['text']] output_texts = page.aihandler_stream(text) # output_texts = page.aihandler_new_stream(text) bot_text = "" for output_text in output_texts: sign = [2 * (random.random() > 0.5) - 1 for _ in range(8)] head_memory = perform_on_text(output_text, sid, head_memory, sign=sign, voice=voice) bot_text += " " + output_text print("SURGERY STARTED!") # perform_surgery(sid) print("SURGERY ENDED!") TEXTS[user_id].append({ "id": "bot", "text": bot_text }) @io.on("io_set_text") def io_set_text(data): data = json.loads(data) data = data[0] sid = None print(data, file=sys.stderr) if "text" not in data: emit("io_error", {"message": "Text not found"}, to=sid) return text = data["text"] """if "user_id" not in data: emit("io_error", {"message": "User not found"}, to=sid) return""" user_id = data.get('user_id') print(user_id) start_speaking(text, user_id=user_id, sid=sid) emit("io_finish", {}, to=sid) io.run(app, host=host, port=port, debug=debug, allow_unsafe_werkzeug=True)