Support us and view this ad

可选:点击以支持我们的网站

免费文章

“实现调用DeepSeek API的Celery任务”或“创建Flask提交选题的路由”;2.修改 tasks.py:在其中导入 SessionLocal 和 Task 模型,重写 generate_article 函数,使其从数据库获取任务、调用DeepSeek API、再将结果更新回数据库。 创建 app.py 路由:提供Web界面,让用户提交选题(即向 task 表插入新记录)和触发审核发布。 实现 platform_publishers 模块:为每个平台(joomla, wechat_mp等)编写具体的发布函数,从 PublishPlatform.config 中读取配置并调用对应API。 因为 要完整的解决方案,所以从最核心的Celery任务开始,因为这是整个自动化流程的引擎。然后构建Flask Web界面让用户可以提交任务,最后实现各个平台的发布函数。 要实现"从数据库获取任务、调用DeepSeek API、再将结果更新回数据库"的完整流程,需要设计一个可靠的任务处理机制。对于Flask路由,需要设计一个简单但功能完整的Web界面。对于平台发布模块,需要考虑到不同平台API的差异性,设计一个可扩展的架构。 从最核心的自动化引擎开始。在数据库验证成功后,下一步就是实现调用DeepSeek API的Celery任务和提供操作界面的Flask应用。 以下是实现 tasks.py 和 app.py 的核心代码与操作指南。 🔧 第一步:实现核心引擎 - 重写 tasks.py 这个文件是整个系统的异步任务中枢,负责执行最耗时的AI生成和内容发布工作。 1. 创建 tasks.py 文件 在您的项目目录 ~/ContentAutomationHub 下,创建并编辑 tasks.py 文件: # ~/ContentAutomationHub/tasks.pyimport osimport requestsimport jsonfrom celery import Celeryfrom models import SessionLocal, Task, PublishPlatform, PublishLog # 1. 初始化Celery应用,连接到Redis# broker和backend的地址需根据您的Redis配置调整(如果Redis运行在默认端口可不变)app = Celery('content_tasks', broker='redis://localhost:6379/0', backend='redis://localhost:6379/0') # 2. DeepSeek API配置 - 【关键】请务必从环境变量读取,不要硬编码!DEEPSEEK_API_KEY = os.environ.get("DEEPSEEK_API_KEY", "您的API密钥")DEEPSEEK_API_URL = "https://api.deepseek.com/v1/chat/completions" @app.task(bind=True)def generate_article(self, task_id): """ 核心任务:从数据库获取任务,调用DeepSeek API生成文章,并更新回数据库。 """ db = SessionLocal() try: # 1. 从数据库获取任务 task = db.query(Task).filter(Task.id == task_id).first() if not task: print(f"任务 {task_id} 不存在") return {"status": "failed", "reason": "Task not found"} print(f"开始为任务 {task_id} ({task.title}) 生成文章...") task.status = 'generating' db.commit() # 2. 精心设计提示词,这是生成高质量文章的关键 prompt = f"""你是一位资深的电子通信与嵌入式开发领域的技术专家。请围绕以下主题,撰写一篇深入、专业且实用的技术文章。 **文章主题:** {task.title} **核心关键词:** {task.keywords}  **文章要求:**1. 结构清晰,包含引言、技术原理分析、实战应用或方案对比、总结展望。2. 语言严谨专业,但避免过于晦涩,确保中级开发者能理解。3. 内容必须基于真实技术原理,可适当举例说明。4. 字数在1500-2000字左右。请直接开始文章的正文内容,无需在开头重复标题。""" # 3. 调用DeepSeek API headers = { 'Authorization': f'Bearer {DEEPSEEK_API_KEY}', 'Content-Type': 'application/json' } payload = { "model": "deepseek-chat", "messages": [{"role": "user", "content": prompt}], "temperature": 0.7, # 控制创造性,技术文章可调低 "max_tokens": 4000 } response = requests.post(DEEPSEEK_API_URL, headers=headers, json=payload, timeout=120) response.raise_for_status() # 检查HTTP错误 result = response.json() # 4. 提取生成的文本并更新数据库 article_content = result['choices'][0]['message']['content'] task.ai_raw_content = article_content task.status = 'review' # 更新为“待审核”状态 db.commit() print(f"任务 {task_id} 文章生成完成,状态已更新为‘待审核’") return {"status": "success", "task_id": task_id} except requests.exceptions.RequestException as e: print(f"API调用失败: {e}") if db and task: task.status = 'failed' db.commit() return {"status": "failed", "reason": f"API Error: {e}"} except Exception as e: print(f"任务处理发生未知错误: {e}") return {"status": "failed", "reason": str(e)} finally: db.close() @app.task(bind=True)def publish_article(self, task_id, platform_names): """ 发布任务:将审核通过的文章发布到指定的多个平台。...

继续阅读完整内容

支持我们的网站,请点击查看下方广告

正在加载广告...

登陆