支持私有化部署
AI知识库

53AI知识库

学习大模型的前沿技术与行业应用场景


AI 新闻小助手 100% 纯提示词实践

发布日期:2025-05-27 07:31:26 浏览次数: 1552 作者:云原架构 CNA
推荐语

将枯燥的RSS新闻转化为生动有趣的公众号内容,AI新闻小助手让创作变得简单而高效。

核心内容:
1. AI新闻小助手的创意转化功能与参数调整
2. 工具的“多重人格”与不同风格的内容输出
3. 技术实现细节与实际体验的专业与通俗并存

杨芳贤
53A创始人/腾讯云(TVP)最具价值专家


如下这个过程是从提示词提供功能需求到适当参与 Debug 到整理 Blog 文字的全过程最后给的结果。


你是否曾经为了准备一篇AI行业的推送而熬夜搜索资料?或者为了让技术文章既专业又不失趣味性而抓耳挠腮?今天,我们来聊聊这个能把干巴巴的RSS新闻变成精彩公众号内容的小工具!


从机械搬运到创意转化


RSS订阅源就像一个不苟言笑的图书管理员,只会机械地把新闻递给你:"这是今天的新闻,拿去看吧"——没有解释,没有分析,更没有个性。


而我们的"AI新闻小助手"则像一位能说会道的专栏作家,在保证信息准确性的基础上,为内容注入生命力。最妙的是,你可以通过调整`temperature`参数,精确控制创意与稳定的平衡点:低温确保内容严谨不跑偏,高温则让表达更加活泼多样。


三种人格,满足多元需求


这个工具最有趣的地方在于它的"多重人格",每种都体现了稳定与创意的不同平衡:


  • 日常简报人格:像科技博主一样用轻松的语气聊AI新闻,保持内容准确的同时,加入亲切表达和emoji

  • 深度分析人格:像行业分析师一样剖析技术本质和市场影响,在稳定输出专业洞见的基础上,加入独到见解

  • 小白翻译人格:能把"Mixture of Experts"解释成"多位专家合伙开诊所"这样通俗易懂的概念,最大化创意表达,但确保技术概念被准确解释


无论选择哪种风格,系统都能在每次运行时提供结构一致的输出,让你的公众号风格稳定又不乏惊喜。


技术实现:不只是简单的API调用


这个工具的灵魂在于健壮性工程,我们精心设计了多重机制:


  • 重试机制:网络不稳定?不要慌,我们自动重试,确保创作流程不中断

  • 缓存系统:为什么要反复下载同一个RSS?聪明的缓存帮你省时间

  • 流式输出:像看电影预告片一样,实时观察创意生成过程,随时调整方向

  • 配置灵活:命令行、环境变量、配置文件多种方式,轻松调整系统行为而不改动代码

  • 异常处理:从容应对各种意外情况,保证系统稳定运行


实际体验:专业与通俗并存


想象一下,当Google发布了"Gemini 2.5 Pro with DeepThink reasoning"这样的术语,经过我们的小白人格翻译后,它变成了:


> Gemini 2.5 Pro就像是一个超级聪明的AI朋友,能帮你写作业、回答问题、陪你聊天。而DeepThink模式则是让这个朋友具备了深度思考能力,不只是回答"是什么",还能解释"为什么"。


在实际应用中,你会发现:


> 每天的推送都保持着结构化的框架和专业水准,读者能形成稳定的阅读预期——但内容表达和视角解读每次都有新鲜感,让人期待明天的更新。


创意与规则的平衡艺术


这个工具的精髓在于,它理解创意不是无序的发散,而是在稳定框架内的有序创新。就像爵士乐的即兴演奏,看似自由奔放,实则遵循着严格的和声规则。


有了它,你不再需要"懂技术"也能写出专业、有趣、易懂的AI行业资讯。当你的读者说"终于看懂了什么是AI大模型"时,那种成就感,比写代码调Bug爽多了!


你的公众号运营,从此告别"技术派"和"科普派"的两难选择,也不必在稳定输出和创意表达之间痛苦取舍——因为现在,你可以同时拥有这一切!在AI时代,我们终于可以鱼与熊掌兼得。


#!/usr/bin/env python3# -*- coding: utf-8 -*-
import feedparserimport datetimeimport requestsimport osimport jsonimport timeimport loggingimport tracebackimport argparseimport configparserfrom pathlib import Pathfrom datetime import timedeltafrom functools import wrapsfrom typing import ListDictAnyOptionalCallable, TypeVar, Union
# 设置日志logging.basicConfig(    level=logging.INFO,    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',    handlers=[        logging.FileHandler("ai_news_generator.log"),        logging.StreamHandler()    ])logger = logging.getLogger("ai_news_generator")
# 类型声明T = TypeVar('T')FeedEntry = Dict[strAny]ApiResponse = Dict[strAny]
# 默认配置DEFAULT_CONFIG = {    "api": {        "base_url""https://xxxxxxxxxxxxx/openai",        "model""xxxxxxxxx",        "api_key": os.environ.get("OPENAI_API_KEY"""),        "max_tokens"2000,        "timeout"60,        "temperature"0.7    },    "rss": {        "url""https://news.smol.ai/rss.xml",        "days"7,        "cache_time"3600  # 缓存RSS内容的时间(秒)    },    "output": {        "directory""ai_news_output",        "format""markdown"    }}
# 重试装饰器def retry(max_attempts: int = 3, delay: int = 2, backoff: int = 2          exceptions: tuple = (Exception,)) -> Callable:    """    重试装饰器,用于处理可能失败的操作
    参数:        max_attempts: 最大尝试次数        delay: 初始延迟时间(秒)        backoff: 延迟的倍数(指数退避)        exceptions: 要捕获的异常元组    """    def decorator(func):        @wraps(func)        def wrapper(*args, **kwargs):            attempt = 0            current_delay = delay
            while attempt < max_attempts:                try:                    return func(*args, **kwargs)                except exceptions as e:                    attempt += 1                    if attempt == max_attempts:                        logger.error(f"最大尝试次数已用完 ({max_attempts}),操作失败: {e}")                        raise
                    logger.warning(f"尝试 {attempt}/{max_attempts} 失败: {e}. "                                  f"将在 {current_delay} 秒后重试...")                    time.sleep(current_delay)                    current_delay *= backoff
        return wrapper    return decorator
class Config:    """配置管理类"""
    def __init__(self, config_file: Optional[str] = None):        """        初始化配置
        参数:            config_file: 配置文件路径,如果不存在则使用默认配置        """        self.config = DEFAULT_CONFIG.copy()
        if config_file and os.path.exists(config_file):            self._load_from_file(config_file)        else:            logger.info("未找到配置文件,使用默认配置")
        # 环境变量优先级高于配置文件        if os.environ.get("OPENAI_API_KEY"):            self.config["api"]["api_key"] = os.environ.get("OPENAI_API_KEY")
    def _load_from_file(self, config_file: str) -> None:        """从文件加载配置"""        try:            parser = configparser.ConfigParser()            parser.read(config_file)
            # 将配置文件值更新到默认配置            for section in parser.sections():                if section in self.config:                    for key, value in parser.items(section):                        # 尝试转换类型以匹配默认配置                        if key in self.config[section]:                            original_type = type(self.config[section][key])                            if original_type is int:                                self.config[section][key] = int(value)                            elif original_type is float:                                self.config[section][key] = float(value)                            elif original_type is bool:                                self.config[section][key] = value.lower() in ("true""yes""1")                            else:                                self.config[section][key] = value
            logger.info(f"从 {config_file} 加载配置")        except Exception as e:            logger.error(f"加载配置文件出错: {e}")
    def save_config(self, file_path: str) -> None:        """将当前配置保存到文件"""        try:            parser = configparser.ConfigParser()
            for section, options in self.config.items():                parser.add_section(section)                for key, value in options.items():                    parser.set(section, key, str(value))
            with open(file_path, 'w'as f:                parser.write(f)
            logger.info(f"配置已保存到 {file_path}")        except Exception as e:            logger.error(f"保存配置文件出错: {e}")
    def get(self, section: str, key: str, default: Any = None) -> Any:        """获取配置值,如果不存在则返回默认值"""        try:            return self.config[section][key]        except KeyError:            logger.warning(f"配置 {section}.{key} 不存在,使用默认值: {default}")            return default
class RssReader:    """RSS订阅内容读取类"""
    def __init__(self, config: Config):        """        初始化RSS读取器
        参数:            config: 配置对象        """        self.config = config        self.cache = {}        self.cache_time = {}
    @retry(max_attempts=3, exceptions=(requests.RequestException,))    def fetch_rss_feed(self, url: Optional[str] = None) -> Optional[bytes]:        """        获取RSS订阅内容
        参数:            url: RSS订阅地址,如果为None则使用配置中的地址
        返回:            RSS内容或None(如果获取失败)        """        url = url or self.config.get("rss""url")        cache_time = self.config.get("rss""cache_time")
        # 检查缓存        if url in self.cache and url in self.cache_time:            if time.time() - self.cache_time[url] < cache_time:                logger.debug(f"使用缓存的RSS内容: {url}")                return self.cache[url]
        try:            logger.info(f"获取RSS订阅: {url}")            response = requests.get(url, timeout=10)            response.raise_for_status()
            # 更新缓存            self.cache[url] = response.content            self.cache_time[url] = time.time()
            return response.content        except requests.exceptions.RequestException as e:            logger.error(f"获取RSS失败: {e}")            raise
    def parse_feed(self, content: Optional[bytes]) -> Optional[feedparser.FeedParserDict]:        """        解析RSS内容
        参数:            content: RSS内容
        返回:            解析后的Feed对象或None(如果解析失败)        """        if not content:            logger.error("没有内容可以解析")            return None
        try:            return feedparser.parse(content)        except Exception as e:            logger.error(f"解析RSS内容失败: {e}")            return None
    def get_recent_entries(self, feed: Optional[feedparser.FeedParserDict],                            days: Optional[int] = None) -> List[FeedEntry]:        """        获取最近n天的订阅内容
        参数:            feed: 解析后的Feed对象            days: 天数,如果为None则使用配置中的值
        返回:            最近的条目列表        """        if not feed:            logger.warning("没有Feed可以获取条目")            return []
        days = days or self.config.get("rss""days")        now = datetime.datetime.now()        cutoff_date = now - timedelta(days=days)        recent_entries = []
        logger.info(f"获取最近 {days} 天的订阅内容")
        for entry in feed.entries:            # 解析发布日期            pub_date = None
            if hasattr(entry, 'published_parsed'and entry.published_parsed:                pub_date = datetime.datetime(*entry.published_parsed[:6])            elif hasattr(entry, 'updated_parsed'and entry.updated_parsed:                pub_date = datetime.datetime(*entry.updated_parsed[:6])            else:                # 如果没有日期信息,跳过该条目                logger.debug(f"跳过没有日期信息的条目: {entry.get('title''Unknown')}")                continue
            # 只保留最近n天的内容            if pub_date >= cutoff_date:                recent_entries.append({                    'title': entry.title,                    'link': entry.link,                    'published': pub_date.strftime('%Y-%m-%d %H:%M:%S'),                    'summary': entry.summary if hasattr(entry, 'summary'else "无摘要",                })
        logger.info(f"找到 {len(recent_entries)} 篇最近的文章")        return recent_entries
    def display_entries(self, entries: List[FeedEntry]) -> None:        """        显示条目内容
        参数:            entries: 条目列表        """        if not entries:            logger.info("没有找到最近的文章")            print("没有找到最近的文章")            return
        print(f"找到 {len(entries)} 篇最近的文章:")        print("-" * 80)
        for i, entry in enumerate(entries, 1):            print(f"{i}{entry['title']}")            print(f"   发布时间: {entry['published']}")            print(f"   链接: {entry['link']}")            print(f"   摘要: {entry['summary'][:200]}...")  # 只显示部分摘要            print("-" * 80)
class ContentGenerator:    """内容生成类"""
    def __init__(self, config: Config):        """        初始化内容生成器
        参数:            config: 配置对象        """        self.config = config        self._load_prompt_templates()
    def _load_prompt_templates(self) -> None:        """加载提示词模板"""        self.prompt_templates = {            "daily""""你是国内顶尖的AI科技公众号编辑,擅长将复杂技术新闻转化为通俗易懂的内容。
请将提供的AI技术新闻整理成一篇微信公众号"每日AI简报",遵循以下要求:
【内容要求】1. 使用标题"【每日AI简报】YYYY年MM月DD日",自动替换为当前日期2. 开头用2-3句话总结今日AI领域的整体趋势或亮点3. 为每条新闻设计简短醒目的小标题,形式为"【关键词】+核心内容"4. 每条新闻包含:   - 事件概述(用最简单的话解释发生了什么)   - 为什么重要(对普通用户或行业的影响)   - 相关背景(如必要,2-3句话解释关键技术概念)
【表达风格】1. 像"科技博主"而非"新闻记者"的语气,亲切自然2. 使用生动的类比和比喻解释技术概念3. 适当使用emoji增强表达(每段1-2个,不要过多)4. 避免专业术语堆砌,必须使用时提供简明解释5. 用"你"直接对读者说话,增强亲近感
【格式规范】1. 通篇采用markdown格式2. 每条新闻之间用分隔线或明显标题区分3. 重点信息可用加粗、斜体强调4. 总篇幅控制在1000-1500字之间5. 结尾添加"感谢阅读,明天见~"和订阅引导
记住:写作目标是让"对AI感兴趣但没有技术背景的普通用户"轻松理解这些技术进展的价值和意义。""",            "deep""""你是一位资深AI领域分析师,擅长深入剖析技术进展和市场影响。
请将提供的AI技术新闻整理成一篇微信公众号"AI技术深度解析",遵循以下要求:
【内容架构】1. 开篇:用简明语言概述本期新闻焦点,指出共同趋势或主题2. 分析框架:将新闻按技术类别或应用领域分组(如LLM进展、多模态、AI应用等)3. 每则新闻包含:   - 技术本质解析(这项技术/产品的核心机制是什么)   - 进步点评估(与现有技术相比有何突破)   - 行业影响分析(将如何改变相关行业格局)   - 技术路线判断(代表了什么发展方向)
【深度化处理】1. 剖析核心技术原理,但使用通俗类比让非专业人士理解2. 关联行业背景和商业模式,解释为何重要3. 适当引入相关技术发展历史和竞争格局4. 对技术发展方向做出有见地的推测
【表达规范】1. 保持客观专业的分析语气,但避免学术化晦涩表达2. 使用结构化段落和子标题保证清晰度3. 复杂概念用图示类比或拆解方式解释4. 适当引用数据或趋势支持分析
最终成文应当让读者不仅了解"发生了什么",更理解"为什么重要"及"未来走向",体现你的专业洞察。""",            "beginner""""你是一位极擅长技术科普的AI科技博主,你的超能力是把最前沿的AI技术解释得让初中生都能理解。
请将提供的AI技术新闻整理成一篇面向完全零基础读者的微信公众号"AI新手村日报",遵循以下要求:
【零门槛原则】1. 假设读者从未接触过AI/ML相关概念,需要从零开始解释2. 每个技术术语第一次出现时必须立即用括号给出"小白解释"3. 使用日常生活中的具体例子和类比解释每个概念4. 把复杂的技术进展转化为"这对你的生活意味着什么"
【内容结构】1. 开场白:友好问候并用一句话概括"今天AI界发生了什么有趣的事"2. 新闻主体:每条新闻使用"你知道吗?"或"想象一下"等引导式开头3. 每则新闻拆解为:   - 这是什么?(用最简单的类比解释)   - 为什么很酷?(用日常场景展示应用)   - 小贴士:提供1-2个延伸知识点,但保持简单
【表达特色】1. 使用轻松愉快的对话式语气,仿佛朋友间聊天2. 丰富使用emoji表情和生动比喻3. 适当加入幽默元素,让技术内容变得有趣4. 使用"想象一下..."、"就好比..."等引导式表达5. 问答形式展开解释,预设读者可能的疑问并回答
【视觉辅助】1. 建议在正文中穿插使用简单示意图的位置标记2. 关键概念用粗体标记3. 使用项目符号和短段落提高可读性
记住:如果一个10岁孩子都能听懂你的解释,那你就成功了!"""        }
    def _prepare_input_content(self, entries: List[FeedEntry]) -> str:        """        准备输入内容
        参数:            entries: 条目列表
        返回:            格式化的输入内容        """        input_content = "以下是最近的AI技术新闻动态,请帮我整理成适合微信公众号的每日AI News推送:\n\n"
        for entry in entries:            input_content += f"标题: {entry['title']}\n"            input_content += f"时间: {entry['published']}\n"            input_content += f"链接: {entry['link']}\n"            input_content += f"摘要: {entry['summary']}\n\n"
        return input_content
    @retry(max_attempts=3, delay=5, exceptions=(requests.RequestException,))    def _call_api(self, messages: List[Dict[strstr]], stream: bool = False) -> Union[str, requests.Response]:        """        调用API
        参数:            messages: 消息列表            stream: 是否流式输出
        返回:            生成的内容或流式响应对象        """        base_url = self.config.get("api""base_url")        model = self.config.get("api""model")        api_key = self.config.get("api""api_key")        timeout = self.config.get("api""timeout")        max_tokens = self.config.get("api""max_tokens")        temperature = self.config.get("api""temperature")
        if not api_key:            raise ValueError("API密钥不能为空")
        # 构建请求数据        payload = {            "model": model,            "messages": messages,            "temperature": temperature,            "max_tokens": max_tokens,            "stream": stream        }
        # 设置请求头        headers = {            "Content-Type""application/json",            "Authorization"f"Bearer {api_key}"        }
        # API 端点        endpoint = f"{base_url}/chat/completions"
        logger.info(f"调用 API: {endpoint}, 流式输出: {stream}")
        if stream:            response = requests.post(                endpoint,                headers=headers,                json=payload,                stream=True,                timeout=timeout            )        else:            response = requests.post(                endpoint,                headers=headers,                json=payload,                timeout=timeout            )
        # 检查响应状态        if response.status_code != 200:            error_msg = f"API 请求失败: 状态码 {response.status_code}, 错误信息: {response.text}"            logger.error(error_msg)            raise requests.RequestException(error_msg)
        if stream:            return response        else:            result = response.json()            if "choices" not in result or not result["choices"]:                raise ValueError("API响应格式错误,找不到内容")            return result["choices"][0]["message"]["content"]
    def format_with_openai(self, entries: List[FeedEntry], style: str = "daily"                           stream: bool = False) -> Optional[str]:        """        使用OpenAI API对内容进行格式化与润色
        参数:            entries: RSS条目列表            style: 输出风格,可选 'daily'(日常简报), 'deep'(深度解析), 'beginner'(小白友好)            stream: 是否使用流式输出
        返回:            格式化后的内容或None(如果失败)        """        if not entries:            logger.warning("没有找到最近的文章可以润色")            return "没有找到最近的文章可以润色"
        try:            # 准备输入内容            input_content = self._prepare_input_content(entries)
            # 选择对应风格的提示词            system_prompt = self.prompt_templates.get(style, self.prompt_templates["daily"])
            # 构建消息            messages = [                {"role""system""content": system_prompt},                {"role""user""content": input_content}            ]
            # 调用API            if stream:                # 流式处理                print("正在生成内容,请稍候...")
                response = self._call_api(messages, stream=True)
                # 处理流式响应                formatted_content = []                client = None
                try:                    # 先尝试导入sseclient,失败则使用自定义解析                    import sseclient                    client = sseclient.SSEClient(response)
                    for event in client.events():                        if event.data != "[DONE]":                            try:                                chunk = json.loads(event.data)                                content = chunk.get("choices", [{}])[0].get("delta", {}).get("content""")                                if content:                                    print(content, end="")                                    formatted_content.append(content)                            except Exception as e:                                logger.warning(f"解析事件失败: {e}")                except ImportError:                    # 手动解析SSE                    logger.info("未安装sseclient,使用自定义SSE解析")                    for line in response.iter_lines():                        if line:                            line = line.decode('utf-8')                            if line.startswith('data: '):                                data = line[6:]                                if data == "[DONE]":                                    break                                try:                                    chunk = json.loads(data)                                    content = chunk.get("choices", [{}])[0].get("delta", {}).get("content""")                                    if content:                                        print(content, end="")                                        formatted_content.append(content)                                except Exception as e:                                    logger.warning(f"解析事件失败: {e}")
                formatted_content = "".join(formatted_content)                print("\n\n生成完成!")            else:                # 非流式处理                formatted_content = self._call_api(messages, stream=False)
            # 添加元数据            metadata = {                "source": self.config.get("rss""url"),                "processed_date": datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"),                "article_count"len(entries),                "style": style            }
            # 添加元数据到内容顶部作为YAML前置元数据            yaml_metadata = "---\n"            for key, value in metadata.items():                yaml_metadata += f"{key}{value}\n"            yaml_metadata += "---\n\n"
            return yaml_metadata + formatted_content
        except Exception as e:            logger.error(f"内容生成失败: {e}")            logger.debug(traceback.format_exc())            return None
    def save_to_file(self, content: Optional[str], filename: Optional[str] = None) -> Optional[str]:        """        将内容保存到文件
        参数:            content: 要保存的内容            filename: 文件名,如果为None则使用当前日期生成
        返回:            保存的文件路径或None(如果失败)        """        if not content:            logger.warning("没有内容可保存")            return None
        # 确保输出目录存在        output_dir = self.config.get("output""directory")        os.makedirs(output_dir, exist_ok=True)
        # 如果未指定文件名,使用当前日期        if not filename:            today = datetime.datetime.now().strftime("%Y-%m-%d")            filename = f"ai_news_{today}.md"
        # 确保文件路径        file_path = os.path.join(output_dir, filename)
        try:            with open(file_path, 'w', encoding='utf-8'as f:                f.write(content)            logger.info(f"内容已保存到 {file_path}")            return file_path        except Exception as e:            logger.error(f"保存文件失败: {e}")            return None
def parse_arguments() -> argparse.Namespace:    """解析命令行参数"""    parser = argparse.ArgumentParser(description="AI News生成器")
    parser.add_argument("--config""-c"type=strhelp="配置文件路径")    parser.add_argument("--rss""-r"type=strhelp="RSS订阅地址")    parser.add_argument("--days""-d"type=inthelp="获取最近几天的内容")    parser.add_argument("--style""-s"type=str, choices=["daily""deep""beginner"],                         default="daily"help="生成内容的风格")    parser.add_argument("--stream", action="store_true"help="使用流式输出")    parser.add_argument("--output""-o"type=strhelp="输出文件路径")    parser.add_argument("--verbose""-v", action="store_true"help="显示详细日志")
    return parser.parse_args()
def main() -> None:    """主函数"""    # 解析命令行参数    args = parse_arguments()
    # 设置日志级别    if args.verbose:        logger.setLevel(logging.DEBUG)
    # 加载配置    config = Config(args.config)
    # 如果命令行参数提供了值,则覆盖配置    if args.rss:        config.config["rss"]["url"] = args.rss    if args.days:        config.config["rss"]["days"] = args.days
    # 初始化RSS读取器和内容生成器    rss_reader = RssReader(config)    content_generator = ContentGenerator(config)
    # 获取RSS内容    rss_url = config.get("rss""url")    days = config.get("rss""days")
    logger.info(f"开始处理,获取 {rss_url} 最近 {days} 天的内容...")    print(f"正在获取 {rss_url} 最近 {days} 天的内容...")
    try:        # 获取并解析RSS        content = rss_reader.fetch_rss_feed()        feed = rss_reader.parse_feed(content)
        if not feed:            logger.error("无法解析RSS内容")            print("无法解析RSS内容")            return
        # 获取最近的条目        recent_entries = rss_reader.get_recent_entries(feed, days)        rss_reader.display_entries(recent_entries)
        if not recent_entries:            logger.warning("没有找到最近的文章")            return
        # 如果命令行没有指定风格和流式输出,则交互式询问        style = args.style        stream = args.stream
        if not args.style and not sys.argv[1:]:  # 如果没有提供任何命令行参数            print("\n选择内容润色风格:")            print("1. 日常简报风格 (默认,适合一般读者)")            print("2. 深度分析风格 (包含更多技术和市场分析)")            print("3. 小白友好风格 (零基础读者也能轻松理解)")            style_choice = input("请选择 (1-3,默认1): ").strip() or "1"
            style_options = {                "1""daily",                "2""deep",                "3""beginner"            }            style = style_options.get(style_choice, "daily")
            print("\n是否使用流式输出? (实时显示生成过程)")            print("1. 是 - 实时显示生成过程")            print("2. 否 - 等待完整生成后显示")            stream_choice = input("请选择 (1-2,默认2): ").strip() or "2"            stream = stream_choice == "1"
        # 使用OpenAI生成内容        logger.info(f"使用OpenAI进行内容润色 (风格: {style}, 流式输出: {stream})")        print(f"\n正在使用OpenAI进行内容润色 (风格: {style})...")
        formatted_content = content_generator.format_with_openai(            recent_entries, style=style, stream=stream        )
        if formatted_content:            if not stream:  # 只有非流式处理才需要显示预览                print("\n润色后内容预览 (前500字):")                print("-" * 80)                print(formatted_content[:500] + "...(更多内容已保存到文件)")                print("-" * 80)
            # 确定输出文件名            output_file = args.output            if not output_file:                today = datetime.datetime.now().strftime("%Y-%m-%d")                output_file = f"ai_news_{style}_{today}.md"
            # 保存到文件            saved_file = content_generator.save_to_file(formatted_content, output_file)
            if saved_file:                print(f"完整内容已保存到 {saved_file}")        else:            logger.error("内容生成失败")            print("内容生成失败,请查看日志获取详细信息")
    except Exception as e:        logger.error(f"处理过程中出错: {e}")        logger.debug(traceback.format_exc())        print(f"处理过程中出错: {e}")        print("请查看日志获取详细信息")
if __name__ == "__main__":    try:        import sys        main()    except KeyboardInterrupt:        logger.info("用户中断执行")        print("\n程序已中断")    except Exception as e:        logger.critical(f"未捕获的异常: {e}")        logger.debug(traceback.format_exc())        print(f"程序遇到错误: {e}")        sys.exit(1)

53AI,企业落地大模型首选服务商

产品:场景落地咨询+大模型应用平台+行业解决方案

承诺:免费场景POC验证,效果验证后签署服务协议。零风险落地应用大模型,已交付160+中大型企业

联系我们

售前咨询
186 6662 7370
预约演示
185 8882 0121

微信扫码

添加专属顾问

回到顶部

加载中...

扫码咨询