""" 图片生成API接口 """ import json import base64 import requests import re import logging import os import random from flask import Blueprint, request, jsonify, Response from backend.utils.model_client import NanoBananaClient from backend.models.database import save_generation_history # 配置日志 logger = logging.getLogger(__name__) # 获取项目根目录 project_root = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) def load_advanced_random_options(): """ 从JSON文件加载高级随机词条配置 Returns: dict: 包含各类别词条的字典 """ try: options_path = os.path.join(project_root, 'model_prompt', 'advanced_random_options.json') with open(options_path, 'r', encoding='utf-8') as f: return json.load(f) except Exception as e: logger.error(f"加载高级随机词条配置失败: {str(e)}") return {} def load_advanced_random_selection(): """ 从JSON文件加载高级随机勾选状态配置 Returns: dict: 包含勾选状态的字典,格式为: - 数组类型类别: { "类别名": ["已勾选的词条1", "已勾选的词条2", ...] } - 字符串类型类别: { "类别名": true/false } """ try: selection_path = os.path.join(project_root, 'model_prompt', 'advanced_random_selection.json') if not os.path.exists(selection_path): # 如果勾选状态文件不存在,返回None表示默认全选 return None with open(selection_path, 'r', encoding='utf-8') as f: return json.load(f) except Exception as e: logger.error(f"加载高级随机勾选状态失败: {str(e)}") return None def generate_random_prompt_suffix(): """ 从配置文件中随机选择词条并拼接成提示词后缀 规则: - 根据勾选状态筛选可选词条 - 对于数组类型的类别,在已勾选的词条中随机选择1个 - 对于字符串类型的类别(如"风格"),如果被勾选则直接使用整个字符串 - 如果某个类别没有勾选任何词条,则跳过该类别 Returns: str: 拼接后的提示词后缀 """ options = load_advanced_random_options() if not options: logger.warning("高级随机词条配置为空") return "" # 加载勾选状态 selection = load_advanced_random_selection() parts = [] for category, items in options.items(): if isinstance(items, list) and len(items) > 0: # 数组类型,根据勾选状态筛选可选词条 if selection is not None: # 有勾选状态配置 selected_items = selection.get(category, []) if isinstance(selected_items, list) and len(selected_items) > 0: # 只在已勾选的词条中随机 # 同时确保词条在原始配置中存在 available_items = [item for item in selected_items if item in items] if available_items: selected = random.choice(available_items) parts.append(f"{category}:{selected}") logger.debug(f"高级随机 - {category}: {selected} (从{len(available_items)}个勾选项中)") else: logger.debug(f"高级随机 - {category}: 跳过(无有效勾选项)") else: # 该类别没有勾选任何词条,跳过 logger.debug(f"高级随机 - {category}: 跳过(未勾选)") else: # 没有勾选状态配置,默认全选 selected = random.choice(items) parts.append(f"{category}:{selected}") logger.debug(f"高级随机 - {category}: {selected}") elif isinstance(items, str) and items: # 字符串类型 if selection is not None: # 检查是否被勾选 is_selected = selection.get(category, False) if is_selected: parts.append(f"{category}:{items}") logger.debug(f"高级随机 - {category}: {items}") else: logger.debug(f"高级随机 - {category}: 跳过(未勾选)") else: # 没有勾选状态配置,默认使用 parts.append(f"{category}:{items}") logger.debug(f"高级随机 - {category}: {items}") if parts: suffix = ",".join(parts) logger.info(f"高级随机词条拼接完成,共{len(parts)}个类别") return suffix return "" def process_advanced_random(prompt, enabled): """ 处理高级随机功能 当启用高级随机时,从配置文件随机选择词条并附加到提示词后面 Args: prompt: 用户原始提示词 enabled: 是否启用高级随机 Returns: str: 处理后的提示词 """ if not enabled: return prompt logger.info("检测到启用高级随机功能,开始处理...") random_suffix = generate_random_prompt_suffix() if random_suffix: processed_prompt = f"{prompt}\n\n{random_suffix}" logger.info(f"高级随机处理完成,已添加随机词条后缀") return processed_prompt return prompt def process_character_linyue(prompt, reference_images): """ 处理"林月"角色一致性 当用户提示词中包含"林月"时,静默完成以下操作: 1. 将林月的参考图片插入到参考图片列表最前面 2. 在用户提示词末尾追加林月的角色设定 3. 在合并后的提示词末尾加上一致性指令 Args: prompt: 用户原始提示词 reference_images: 用户上传的参考图片列表 Returns: (processed_prompt, processed_images): 处理后的提示词和参考图片列表 """ # 检测提示词中是否包含"林月" if "林月" not in prompt: return prompt, reference_images logger.info("检测到'林月'角色,开始处理角色一致性...") processed_prompt = prompt processed_images = reference_images.copy() if reference_images else [] # 1. 读取林月的参考图片并插入到最前面 try: linyue_image_path = os.path.join(project_root, 'model_prompt', 'linyue', 'linyue.png') with open(linyue_image_path, 'rb') as f: image_data = f.read() linyue_image_base64 = f"data:image/png;base64,{base64.b64encode(image_data).decode('utf-8')}" # 将林月图片插入到最前面 processed_images.insert(0, linyue_image_base64) logger.info(f"已将林月参考图片插入到第一位,当前参考图片数量: {len(processed_images)}") except Exception as e: logger.error(f"读取林月参考图片失败: {str(e)}") # 2. 读取林月的角色设定并追加到提示词末尾 try: linyue_prompt_path = os.path.join(project_root, 'model_prompt', 'linyue', 'linyue.txt') with open(linyue_prompt_path, 'r', encoding='utf-8') as f: linyue_character_setting = f.read().strip() # 追加角色设定到提示词末尾 processed_prompt = f"{processed_prompt}\n\n{linyue_character_setting}" logger.info("已将林月角色设定追加到提示词末尾") except Exception as e: logger.error(f"读取林月角色设定失败: {str(e)}") # 3. 在合并后的提示词末尾加上一致性指令 consistency_instruction = "保持参考图片1中的人物一致性,保持参考图片1中的人物五官一致性" processed_prompt = f"{processed_prompt}\n\n{consistency_instruction}" logger.info("已添加人物一致性指令") logger.info(f"林月角色处理完成,最终参考图片数量: {len(processed_images)}") return processed_prompt, processed_images generate_bp = Blueprint('generate', __name__) @generate_bp.route('/api/generate', methods=['POST']) def generate_images(): """ 生成图片接口 请求参数: - prompt: 提示词 - count: 图片数量 (1-10) - model: 模型名称 - temperature: 创意度参数 (0-1) - reference_images: 参考图片base64数组 (可选,最多3张) - advanced_random: 是否启用高级随机 (可选,默认false) """ try: data = request.get_json() # 参数验证 prompt = data.get('prompt', '').strip() if not prompt: logger.error("提示词为空") return jsonify({'error': '提示词不能为空'}), 400 count = data.get('count', 1) if not isinstance(count, int) or count < 1 or count > 10: logger.error(f"图片数量无效: {count}") return jsonify({'error': '图片数量必须在1-10之间'}), 400 model = data.get('model', 'gemini-3-pro-image-preview') temperature = data.get('temperature', 0.7) # 高级随机参数 advanced_random = data.get('advanced_random', False) # 支持多张参考图片 reference_images = data.get('reference_images', []) if not isinstance(reference_images, list): reference_images = [] # 保存原始提示词(用于高级随机时每次独立随机) base_prompt = prompt # 如果未启用高级随机,则在这里处理林月角色一致性(只处理一次) # 如果启用了高级随机,则在每个线程内部处理 if not advanced_random: # 处理"林月"角色一致性(在参考图片数量限制之前处理) prompt, reference_images = process_character_linyue(prompt, reference_images) # 限制最多3张参考图片(林月图片算一张) if len(reference_images) > 3: reference_images = reference_images[:3] logger.warning("参考图片超过3张,已截取前3张") logger.info(f"开始生成 {count} 张图片: {prompt[:50]}...") if reference_images: logger.info(f"使用 {len(reference_images)} 张参考图片") # 创建模型客户端 client = NanoBananaClient() def generate(): """生成器函数,支持并发流式响应""" import threading import queue import time results = {} result_queue = queue.Queue() completed_count = 0 def generate_single_image(index): """单个图片生成任务""" import threading thread_id = threading.current_thread().ident logger.info(f"[线程{thread_id}] 开始生成第 {index + 1} 张图片...") try: # 当前任务使用的提示词和参考图片 current_prompt = prompt current_reference_images = reference_images # 如果启用了高级随机,每张图片独立随机一次 if advanced_random: current_prompt = process_advanced_random(base_prompt, True) # 高级随机模式下也需要处理林月角色一致性 current_prompt, current_reference_images = process_character_linyue( current_prompt, reference_images.copy() if reference_images else [] ) # 限制参考图片数量 if len(current_reference_images) > 3: current_reference_images = current_reference_images[:3] logger.info(f"[线程{thread_id}] 高级随机模式,第 {index + 1} 张图片使用独立随机参数") # 发送生成请求 if current_reference_images and len(current_reference_images) > 0: # 图片编辑模式(支持多张参考图片) logger.info(f"[线程{thread_id}] 使用图片编辑模式生成第 {index + 1} 张图片,参考图片数量: {len(current_reference_images)}") image_data = client.generate_with_images(current_prompt, current_reference_images, temperature) else: # 文生图模式 logger.info(f"[线程{thread_id}] 使用文生图模式生成第 {index + 1} 张图片") image_data = client.generate_text_to_image(current_prompt, temperature) if image_data: logger.info(f"[线程{thread_id}] 第 {index + 1} 张图片生成成功") result = { 'index': index, 'status': 'success', 'image_data': image_data, 'timestamp': client.get_timestamp() } else: logger.error(f"[线程{thread_id}] 第 {index + 1} 张图片生成失败:未返回图片数据") result = { 'index': index, 'status': 'error', 'error': '图片生成失败', 'timestamp': client.get_timestamp() } except Exception as e: logger.error(f"[线程{thread_id}] 第 {index + 1} 张图片生成异常: {str(e)}") result = { 'index': index, 'status': 'error', 'error': str(e), 'timestamp': client.get_timestamp() } # 将结果放入队列 result_queue.put(result) # 启动并发任务 threads = [] for i in range(count): thread = threading.Thread(target=generate_single_image, args=(i,)) thread.daemon = True thread.start() threads.append(thread) # 处理结果并实时返回 while completed_count < count: try: # 等待结果,设置超时避免死锁 result = result_queue.get(timeout=1.0) results[result['index']] = result completed_count += 1 # 立即返回结果 json_str = json.dumps(result, ensure_ascii=False) yield f"data: {json_str}\n\n" except queue.Empty: # 超时,检查线程状态 alive_threads = [t for t in threads if t.is_alive()] if not alive_threads: # 所有线程都结束了,跳出循环 break continue # 等待所有线程完成 for thread in threads: thread.join(timeout=5.0) # 最多等待5秒 # 处理队列中剩余的结果 while not result_queue.empty(): try: result = result_queue.get_nowait() if result['index'] not in results: results[result['index']] = result completed_count += 1 json_str = json.dumps(result, ensure_ascii=False) yield f"data: {json_str}\n\n" except queue.Empty: break # 保存生成历史 success_count = len([r for r in results.values() if r['status'] == 'success']) logger.info(f"生成完成: {success_count}/{count} 成功") try: save_generation_history(prompt, model, count, success_count) except Exception as e: logger.error(f"保存生成历史失败: {str(e)}") # 发送完成信号 final_result = { 'status': 'completed', 'total': count, 'success': success_count, 'failed': count - success_count } json_str = json.dumps(final_result, ensure_ascii=False) yield f"data: {json_str}\n\n" return Response(generate(), mimetype='text/event-stream') except Exception as e: logger.error(f"图片生成API异常: {str(e)}") return jsonify({'error': f'服务器错误: {str(e)}'}), 500