提交 8581f845 作者: 樊倪骄

爬虫

父级
# Python cache
__pycache__/
*.py[cod]
*.pyo
*.pyd
*.so
# Virtual environments
.venv/
venv/
env/
ENV/
# Test and tooling cache
.pytest_cache/
.mypy_cache/
.ruff_cache/
.coverage
.coverage.*
htmlcov/
# Build artifacts
build/
dist/
.eggs/
*.egg-info/
pip-wheel-metadata/
# Local environment and secrets
.env
.env.*
!.env.example
# Local runtime files
*.log
logs/
.data/
tmp/
temp/
*.db
*.sqlite
*.sqlite3
# IDE and OS files
.vscode/
.idea/
.DS_Store
Thumbs.db
# Jupyter
.ipynb_checkpoints/
"""
新闻爬虫 API 服务
支持并发爬取、动态内容获取、多种解析方案
"""
import asyncio
import logging
import threading
import time
from dataclasses import dataclass
from enum import Enum
from typing import List, Optional, Dict, Tuple
import requests
import uvicorn
from bs4 import BeautifulSoup
from fastapi import FastAPI, HTTPException
from newspaper import Article, ArticleException
from playwright.async_api import async_playwright
from pydantic import BaseModel
# ===========================
# 配置管理
# ===========================
@dataclass
class CrawlerConfig:
"""爬虫配置"""
# 并发控制
max_concurrent: int = 10 # 提升到 5 个并发任务
# 超时设置(统一使用秒作为单位)
default_timeout: int = 1000 # 总超时时间(秒)
static_request_timeout: int = 10 # 静态请求超时(秒)
playwright_page_timeout: int = 45 # Playwright 页面加载超时(秒,降低以加快失败检测)
content_selector_timeout: int = 5 # 内容选择器等待超时(秒,降低以加快速度)
# 滚动设置
default_scroll_times: int = 2 # 减少滚动次数加快速度(从 3 改为 2)
scroll_wait_time: int = 800 # 滚动等待时间(毫秒,从 1000 改为 800)
# 内容验证
min_content_length: int = 50
# User-Agent
user_agent: str = 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36'
# 内容选择器
content_selectors: List[str] = None
title_selectors: List[str] = None
def __post_init__(self):
if self.content_selectors is None:
self.content_selectors = [
'.xeditor_content.app_h5_article',
'.xeditor_content',
'.article-body',
'.article-content',
'.content',
'article',
'[class*="article"]',
'[class*="content"]'
]
if self.title_selectors is None:
self.title_selectors = ['h1', '.title', '.article-title', 'title']
# 全局配置实例
config = CrawlerConfig()
# ===========================
# 日志配置
# ===========================
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
# ===========================
# 数据模型
# ===========================
class CrawlRequest(BaseModel):
"""爬取请求模型"""
urls: List[str]
scroll_times: Optional[int] = config.default_scroll_times
timeout: Optional[int] = config.default_timeout
class NewsItem(BaseModel):
"""新闻内容模型"""
url: str
title: str
text: str
class CrawlResponse(BaseModel):
"""爬取响应模型"""
success: bool
total: int
success_count: int
failed_count: int
results: List[NewsItem]
failed_urls: List[str]
elapsed_time: float
class FetchMethod(Enum):
"""HTML获取方法"""
STATIC = "static"
DYNAMIC = "dynamic"
# ===========================
# HTML 获取层
# ===========================
# Playwright 并发控制(使用异步信号量)
_playwright_semaphore = asyncio.Semaphore(5) # 允许 5 个 Playwright 实例并发运行
class HTMLFetcher:
"""HTML 内容获取器"""
@staticmethod
def fetch_static(url: str, timeout: int = None) -> Optional[str]:
"""使用 requests 获取静态 HTML"""
timeout = timeout or config.static_request_timeout
try:
headers = {'User-Agent': config.user_agent}
response = requests.get(url, headers=headers, timeout=timeout)
response.raise_for_status()
response.encoding = response.apparent_encoding
logger.info(f"✓ 静态获取成功: {url}")
return response.text
except Exception as e:
logger.warning(f"静态获取失败 {url}: {e}")
return None
@staticmethod
async def fetch_dynamic(url: str, scroll_times: int = None, timeout: int = None) -> Optional[str]:
"""使用 Playwright 异步获取动态 HTML(适用于 FastAPI 异步环境)"""
scroll_times = scroll_times or config.default_scroll_times
timeout = timeout or config.playwright_page_timeout
timeout_ms = timeout * 1000
content_timeout_ms = config.content_selector_timeout * 1000
# 🔒 使用异步信号量限制并发(避免资源竞争)
async with _playwright_semaphore:
logger.info(f"🌐 正在启动 Playwright 浏览器: {url}")
try:
# 使用异步 Playwright API
async with async_playwright() as p:
browser = await p.chromium.launch(
headless=True,
args=[
'--no-sandbox',
'--disable-setuid-sandbox',
'--disable-dev-shm-usage',
'--disable-gpu'
]
)
context = await browser.new_context(user_agent=config.user_agent)
page = await context.new_page()
try:
# 增加一层超时保护
logger.info(f" ├─ 正在跳转 URL...")
await page.goto(url, wait_until="domcontentloaded", timeout=timeout_ms)
# 尝试等待内容
try:
selectors = ", ".join(config.content_selectors[:4])
await page.wait_for_selector(selectors, timeout=content_timeout_ms)
except:
pass
# 安全滚动(处理 document.body 为 null 的情况)
for i in range(scroll_times):
try:
# 使用更安全的滚动方式
await page.evaluate("""
() => {
const scrollTarget = document.body || document.documentElement;
if (scrollTarget && scrollTarget.scrollHeight) {
window.scrollTo({
top: scrollTarget.scrollHeight,
behavior: 'smooth'
});
}
}
""")
await page.wait_for_timeout(config.scroll_wait_time)
except Exception as scroll_err:
logger.debug(f"滚动第 {i+1} 次失败(忽略): {scroll_err}")
break # 滚动失败就停止滚动,继续获取内容
html = await page.content()
logger.info(f" └─ ✓ 动态获取完成")
return html
except Exception as page_err:
logger.error(f"页面操作异常: {page_err}")
raise
finally:
# 安全关闭浏览器
try:
await browser.close()
except Exception as close_err:
logger.debug(f"浏览器关闭异常(忽略): {close_err}")
except Exception as e:
logger.error(f"✗ Playwright 运行异常: {e}")
return None
@classmethod
async def fetch_with_fallback(
cls,
url: str,
scroll_times: int = None,
timeout: int = None,
cancel_event: Optional[threading.Event] = None
) -> Optional[str]:
"""先尝试静态获取,失败则使用动态方法(异步版本)"""
# 检查取消标志
if cancel_event and cancel_event.is_set():
return None
# 尝试静态获取(在线程池中运行同步代码)
loop = asyncio.get_event_loop()
html = await loop.run_in_executor(None, cls.fetch_static, url)
# 再次检查取消标志
if cancel_event and cancel_event.is_set():
return None
if html and cls._validate_html_content(html):
return html
# 最后检查取消标志(避免在已超时时启动 Playwright)
if cancel_event and cancel_event.is_set():
logger.info(f"任务已取消,跳过动态获取: {url}")
return None
# 静态获取失败或内容不足,尝试动态获取
logger.info(f"切换到动态获取: {url}")
return await cls.fetch_dynamic(url, scroll_times, timeout)
@staticmethod
def _validate_html_content(html: str) -> bool:
"""验证 HTML 是否包含有效内容"""
try:
soup = BeautifulSoup(html, 'html.parser')
for selector in config.content_selectors:
if soup.select_one(selector):
return True
return False
except:
return False
# ===========================
# 内容提取层
# ===========================
class ContentExtractor:
"""内容提取器"""
@staticmethod
def extract_with_newspaper(html: str, url: str) -> Optional[Dict]:
"""使用 Newspaper3k 提取内容"""
try:
article = Article(url)
article.set_html(html)
article.parse()
if article.text and len(article.text.strip()) >= config.min_content_length:
return {
"url": url,
"title": article.title,
"text": article.text,
}
logger.debug(f"Newspaper3k 提取内容不足: {url}")
return None
except (ArticleException, Exception) as e:
logger.debug(f"Newspaper3k 解析失败 {url}: {e}")
return None
@staticmethod
def extract_with_beautifulsoup(html: str, url: str) -> Optional[Dict]:
"""使用 BeautifulSoup 提取内容"""
try:
soup = BeautifulSoup(html, 'html.parser')
# 提取标题
title = ""
for selector in config.title_selectors:
elem = soup.select_one(selector)
if elem:
title = elem.get_text(strip=True)
break
# 提取内容
content_text = ""
for selector in config.content_selectors:
elem = soup.select_one(selector)
if elem:
# 移除不需要的标签
for tag in elem(["script", "style", "a"]):
tag.decompose()
content_text = elem.get_text(separator='\n', strip=True)
if len(content_text) >= config.min_content_length:
break
# 如果内容不足,尝试提取所有段落
if len(content_text) < config.min_content_length:
paragraphs = soup.find_all('p')
content_text = '\n'.join(
p.get_text(strip=True)
for p in paragraphs
if p.get_text(strip=True)
)
if len(content_text) >= config.min_content_length:
return {
"url": url,
"title": title or (soup.title.string if soup.title else ""),
"text": content_text
}
logger.warning(f"BeautifulSoup 提取内容不足: {url}")
return None
except Exception as e:
logger.error(f"BeautifulSoup 解析失败 {url}: {e}")
return None
@classmethod
def extract(cls, html: str, url: str) -> Optional[Dict]:
"""提取内容(先用 Newspaper3k,失败则用 BeautifulSoup)"""
if not html:
return None
# 尝试 Newspaper3k
result = cls.extract_with_newspaper(html, url)
if result:
return result
# 回退到 BeautifulSoup
return cls.extract_with_beautifulsoup(html, url)
# ===========================
# 爬虫核心层
# ===========================
class NewsCrawler:
"""新闻爬虫(异步版本)"""
def __init__(self, config: CrawlerConfig = None):
self.config = config or globals()['config']
self.fetcher = HTMLFetcher()
self.extractor = ContentExtractor()
async def crawl_url(
self,
url: str,
scroll_times: int = None,
timeout: int = None,
cancel_event: Optional[threading.Event] = None
) -> Optional[Dict]:
"""爬取单个 URL(异步版本)"""
try:
# 检查取消标志
if cancel_event and cancel_event.is_set():
return None
logger.info(f"开始爬取: {url}")
# 获取 HTML
html = await self.fetcher.fetch_with_fallback(url, scroll_times, timeout, cancel_event)
# 再次检查取消标志
if cancel_event and cancel_event.is_set():
return None
if not html:
# 检查是否因为取消而失败
if cancel_event and cancel_event.is_set():
return None
logger.error(f"✗ 无法获取 HTML: {url}")
return None
# 提取内容
result = self.extractor.extract(html, url)
# 最后检查取消标志
if cancel_event and cancel_event.is_set():
return None
if result:
logger.info(f"✓ 成功爬取: {result['title'][:50]}...")
else:
logger.error(f"✗ 无法提取内容: {url}")
return result
except Exception as e:
# 检查是否因为取消而产生异常
if cancel_event and cancel_event.is_set():
return None
logger.error(f"爬取异常 {url}: {e}")
return None
# ===========================
# 并发控制层
# ===========================
class ConcurrentCrawler:
"""并发爬虫管理器(异步版本)"""
def __init__(self, max_concurrent: int = None):
self.max_concurrent = max_concurrent or config.max_concurrent
self.semaphore = asyncio.Semaphore(self.max_concurrent)
self.crawler = NewsCrawler()
async def crawl_with_semaphore(
self,
url: str,
scroll_times: int,
timeout: int,
cancel_event: threading.Event
) -> Tuple[str, Optional[Dict]]:
"""带信号量控制的异步爬取"""
async with self.semaphore:
# 检查取消标志
if cancel_event.is_set():
logger.info(f"[并发] 任务已取消: {url}")
return (url, None)
# 直接调用异步爬虫方法
result = await self.crawler.crawl_url(
url,
scroll_times,
timeout,
cancel_event
)
# 再次检查取消标志
if cancel_event.is_set():
logger.info(f"[并发] 任务运行中被取消: {url}")
return (url, None)
return (url, result)
async def crawl_batch(
self,
urls: List[str],
scroll_times: int,
timeout: int
) -> Tuple[List[Dict], List[str], float]:
"""批量爬取 URL"""
if not urls:
return [], [], 0.0
logger.info(f"开始批量爬取: {len(urls)} 个URL, 最多 {self.max_concurrent} 个并发, 超时 {timeout}秒")
start_time = time.time()
# 创建取消事件
cancel_event = threading.Event()
# 创建所有任务
tasks = [
asyncio.create_task(
self.crawl_with_semaphore(url, scroll_times, timeout, cancel_event)
)
for url in urls
]
# 执行任务(带总超时)
try:
results = await asyncio.wait_for(
asyncio.gather(*tasks, return_exceptions=True),
timeout=timeout
)
except asyncio.TimeoutError:
logger.warning(f"⚠ 请求超时 ({timeout}秒),强制结束")
cancel_event.set()
results = await self._collect_results(tasks)
# 处理结果
success_items = []
failed_urls = []
for i, result in enumerate(results):
url = urls[i]
# 检查是否是异常
if isinstance(result, Exception):
logger.error(f"✗ URL异常 {url}: {result}")
failed_urls.append(url)
# 检查是否是有效的元组结果
elif isinstance(result, tuple) and len(result) == 2:
_, data = result
if data:
success_items.append(data)
else:
failed_urls.append(url)
else:
# 其他情况都视为失败
failed_urls.append(url)
elapsed_time = min(time.time() - start_time, timeout)
logger.info(f"批量爬取完成: 成功 {len(success_items)}, 失败 {len(failed_urls)}, 耗时 {elapsed_time:.2f}秒")
return success_items, failed_urls, elapsed_time
@staticmethod
def _collect_results_immediate(tasks: List[asyncio.Task], urls: List[str]) -> List:
"""立即收集已完成的任务结果,不等待未完成的任务
Args:
tasks: 任务列表
urls: URL列表(用于日志)
Returns:
结果列表(已完成的结果 + 未完成任务的超时标记)
"""
results = []
for i, task in enumerate(tasks):
if task.done():
# 任务已完成,获取结果
try:
results.append(task.result())
except (asyncio.CancelledError, Exception) as e:
results.append(e)
else:
# 任务未完成,取消它但不等待
task.cancel()
logger.info(f"取消未完成任务: {urls[i]}")
results.append(TimeoutError("任务超时"))
return results
@staticmethod
async def _collect_results(tasks: List[asyncio.Task]) -> List:
"""收集任务结果(超时情况,等待取消完成)
注意:此方法会等待任务取消,不建议在超时场景使用
"""
results = []
for task in tasks:
if task.done():
try:
results.append(task.result())
except (asyncio.CancelledError, Exception) as e:
results.append(e)
else:
task.cancel()
try:
await task
except asyncio.CancelledError:
pass
results.append(TimeoutError("任务超时"))
return results
# ===========================
# FastAPI 应用层
# ===========================
app = FastAPI(
title="新闻爬虫 API",
description="支持并发爬取、动态内容获取、多种解析方案",
version="2.0.0"
)
# 全局爬虫实例
concurrent_crawler = ConcurrentCrawler()
@app.post("/crawl", response_model=CrawlResponse)
async def crawl_news(request: CrawlRequest):
"""
批量爬取新闻内容
- **urls**: 要爬取的链接列表
- **scroll_times**: 页面滚动次数(可选,默认3次)
- **timeout**: 总超时时间,单位秒(可选,默认1000秒)
返回成功和失败的结果统计
"""
if not request.urls:
raise HTTPException(status_code=400, detail="链接列表不能为空")
# 批量爬取
success_items, failed_urls, elapsed_time = await concurrent_crawler.crawl_batch(
urls=request.urls,
scroll_times=request.scroll_times,
timeout=request.timeout
)
# 构建响应
return CrawlResponse(
success=True,
total=len(request.urls),
success_count=len(success_items),
failed_count=len(failed_urls),
results=[NewsItem(**item) for item in success_items],
failed_urls=failed_urls,
elapsed_time=elapsed_time
)
@app.get("/")
async def root():
"""API 根路径"""
return {
"name": "新闻爬虫 API",
"version": "2.0.0",
"endpoints": {
"/crawl": "POST - 批量爬取新闻",
"/health": "GET - 健康检查",
"/docs": "GET - API 文档",
"/config": "GET - 查看配置"
}
}
@app.get("/health")
async def health():
"""健康检查"""
return {
"status": "healthy",
"max_concurrent": config.max_concurrent
}
@app.get("/config")
async def get_config():
"""查看当前配置"""
return {
"max_concurrent": config.max_concurrent,
"default_timeout": config.default_timeout,
"default_scroll_times": config.default_scroll_times,
"min_content_length": config.min_content_length
}
# ===========================
# 启动入口
# ===========================
if __name__ == "__main__":
uvicorn.run(
app,
host="0.0.0.0",
port=8106,
log_level="info"
)
import json
import time
import requests
from playwright.sync_api import sync_playwright
from newspaper import Article, ArticleException
from bs4 import BeautifulSoup
def get_static_html(url, timeout=10):
"""用 requests 获取静态 HTML(优先方案)"""
try:
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36'
}
response = requests.get(url, headers=headers, timeout=timeout)
response.raise_for_status()
response.encoding = response.apparent_encoding # 自动检测编码
return response.text
except Exception as e:
print(f"requests 获取 {url} 失败:{e}")
return None
def get_dynamic_html(url, scroll_times=3, timeout=1000):
"""用 Playwright 获取动态 HTML(备用方案)"""
with sync_playwright() as p:
browser = p.chromium.launch(headless=True)
# 设置用户代理
context = browser.new_context(
user_agent="Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36"
)
page = context.new_page()
try:
page.goto(url, wait_until="networkidle", timeout=timeout*1000)
# 尝试等待内容区域加载(针对东方财富等网站)
try:
page.wait_for_selector(".xeditor_content, .article-body, article, .content", timeout=10000)
except:
pass # 如果找不到特定元素,继续执行
for _ in range(scroll_times):
page.evaluate("window.scrollTo(0, document.body.scrollHeight)")
page.wait_for_timeout(1000)
html = page.content()
except Exception as e:
print(f"Playwright 爬取 {url} 失败:{e}")
html = None
finally:
browser.close()
return html
def get_html_with_fallback(url, scroll_times=3, timeout=1000):
"""先尝试用 requests + BeautifulSoup,失败再用 Playwright"""
print(f"尝试使用 requests 获取 {url}...")
html = get_static_html(url)
if html:
# 尝试用 BeautifulSoup 验证是否能提取到内容
soup = BeautifulSoup(html, 'html.parser')
content_selectors = [
'.xeditor_content.app_h5_article',
'.xeditor_content',
'.article-body',
'.article-content',
'.content',
'article',
'[class*="article"]',
'[class*="content"]'
]
# 检查是否能找到内容
content_found = False
for selector in content_selectors:
if soup.select_one(selector):
content_found = True
break
# 如果找到了内容,返回 HTML
if content_found:
print(f"✓ requests 成功获取 {url},内容可用")
return html
else:
print(f"⚠ requests 获取了 HTML,但未找到内容,尝试使用 Playwright...")
else:
print(f"⚠ requests 获取失败,尝试使用 Playwright...")
# 如果静态获取失败或找不到内容,使用 Playwright
print(f"使用 Playwright 获取 {url}...")
html = get_dynamic_html(url, scroll_times, timeout)
if html:
print(f"✓ Playwright 成功获取 {url}")
return html
def extract_with_beautifulsoup(html, url):
"""用 BeautifulSoup 直接提取内容(备用方案)"""
try:
soup = BeautifulSoup(html, 'html.parser')
# 尝试多种常见的内容选择器
content_selectors = [
'.xeditor_content.app_h5_article', # 东方财富
'.xeditor_content',
'.article-body',
'.article-content',
'.content',
'article',
'[class*="article"]',
'[class*="content"]'
]
content_text = ""
title = ""
# 提取标题
title_selectors = ['h1', '.title', '.article-title', 'title']
for selector in title_selectors:
title_elem = soup.select_one(selector)
if title_elem:
title = title_elem.get_text(strip=True)
break
# 提取内容
for selector in content_selectors:
content_elem = soup.select_one(selector)
if content_elem:
# 移除脚本和样式标签
for script in content_elem(["script", "style", "a"]):
script.decompose()
content_text = content_elem.get_text(separator='\n', strip=True)
if content_text and len(content_text) > 50: # 确保内容足够长
break
# 如果还是没找到,尝试提取所有p标签
if not content_text or len(content_text) < 50:
paragraphs = soup.find_all('p')
content_text = '\n'.join([p.get_text(strip=True) for p in paragraphs if p.get_text(strip=True)])
return {
"url": url,
"title": title or soup.title.string if soup.title else "",
"text": content_text
}
except Exception as e:
print(f"BeautifulSoup 解析 {url} 失败:{e}")
return None
def extract_news_content(html, url):
"""用 Newspaper3k 提取内容,失败则用 BeautifulSoup"""
if not html:
return None
# 首先尝试用 Newspaper3k
article = Article(url)
article.set_html(html)
try:
article.parse()
# 如果提取的内容为空或太短,使用备用方案
if article.text and len(article.text.strip()) > 50:
return {
"url": url,
"title": article.title,
"text": article.text,
}
else:
print(f"Newspaper3k 提取的内容为空或太短,尝试使用 BeautifulSoup...")
except ArticleException as e:
print(f"Newspaper3k 解析 {url} 失败:{e},尝试使用 BeautifulSoup...")
except Exception as e:
print(f"Newspaper3k 解析 {url} 发生错误:{e},尝试使用 BeautifulSoup...")
# 备用方案:使用 BeautifulSoup
return extract_with_beautifulsoup(html, url)
def batch_crawl_news(url_list, output_file="news_aggregated.json"):
"""批量爬取并保存新闻内容"""
news_list = []
for i, url in enumerate(url_list, 1):
print(f"\n正在处理第 {i}/{len(url_list)} 个链接:{url}")
# 先尝试 requests + BeautifulSoup,失败再用 Playwright
html = get_html_with_fallback(url)
if not html:
print(f"✗ 无法获取 {url} 的 HTML,跳过")
continue
# 提取内容
news_data = extract_news_content(html, url)
if news_data:
news_list.append(news_data)
print(f"✓ 成功提取内容:{news_data.get('title', '无标题')[:50]}...")
else:
print(f"✗ 无法提取 {url} 的内容")
# 避免请求过快被封,间隔 2 秒
time.sleep(2)
# 保存到 JSON 文件
with open(output_file, "w", encoding="utf-8") as f:
json.dump(news_list, f, ensure_ascii=False, indent=2)
print(f"\n所有处理完成,共提取 {len(news_list)} 条新闻,已保存到 {output_file}")
# 测试:批量爬取科技新闻
if __name__ == "__main__":
# 待爬取的新闻链接列表(可替换为其他链接)
news_urls = [
"http://www.chnmodel.com/zcjh/moq/2018-07-23/5501.html",
"http://caifuhao.eastmoney.com/news/20240225070209973371220",
"https://caifuhao.eastmoney.com/news/20240225085325506722330",
"https://www.toutiao.com/article/7353226433517175305/",
"https://www.waytoagi.com/question/56972",
"http://www.chnmodel.com/qzgl/2018-07-18/4355.html",
"https://max.book118.com/html/2024/0125/6000210030010041.shtm"
]
# 批量爬取并保存
batch_crawl_news(news_urls, "tech_news.json")
\ No newline at end of file
"""
并发性能基准测试
对比不同并发数的爬取速度
"""
import asyncio
import time
from playwright.async_api import async_playwright
async def crawl_page_simple(browser, url: str, index: int):
"""简化的页面爬取(用于基准测试)"""
try:
page = await browser.new_page()
await page.goto(url, wait_until="domcontentloaded", timeout=30000)
title = await page.title()
await page.close()
return {"url": url, "title": title, "index": index}
except Exception as e:
return {"url": url, "error": str(e), "index": index}
async def benchmark(urls: list, max_concurrent: int):
"""基准测试指定并发数"""
print(f"\n{'='*60}")
print(f"测试配置: {len(urls)} 个URL, 并发数: {max_concurrent}")
print(f"{'='*60}")
start_time = time.time()
async with async_playwright() as p:
browser = await p.chromium.launch(
headless=True,
args=['--no-sandbox', '--disable-setuid-sandbox', '--disable-dev-shm-usage']
)
semaphore = asyncio.Semaphore(max_concurrent)
async def crawl_with_semaphore(url, index):
async with semaphore:
return await crawl_page_simple(browser, url, index)
tasks = [crawl_with_semaphore(url, i+1) for i, url in enumerate(urls)]
results = await asyncio.gather(*tasks, return_exceptions=True)
await browser.close()
elapsed = time.time() - start_time
success_count = sum(1 for r in results if isinstance(r, dict) and "error" not in r)
print(f"\n✓ 完成!")
print(f" 总耗时: {elapsed:.2f}秒")
print(f" 成功: {success_count}/{len(urls)}")
print(f" 平均速度: {elapsed/len(urls):.2f}秒/页")
return elapsed, success_count
async def main():
"""主函数:对比不同并发数"""
# 测试 URL
test_urls = [
"https://docs.python.org/3/",
"https://github.com/trending",
"https://news.ycombinator.com/",
"https://www.wikipedia.org/",
"https://stackoverflow.com/",
"https://developer.mozilla.org/",
"https://www.rust-lang.org/",
"https://golang.org/",
"https://www.typescriptlang.org/",
"https://reactjs.org/",
]
print("\n" + "="*60)
print("并发性能对比测试")
print("="*60)
# 测试不同的并发数
concurrent_levels = [1, 2, 5, 10]
results = {}
for concurrent in concurrent_levels:
elapsed, success = await benchmark(test_urls, concurrent)
results[concurrent] = elapsed
await asyncio.sleep(2) # 间隔 2 秒避免服务器压力
# 打印对比结果
print("\n" + "="*60)
print("性能对比总结")
print("="*60)
baseline = results[1] # 以并发1为基准
for concurrent, elapsed in results.items():
speedup = baseline / elapsed
print(f"并发 {concurrent:2d}: {elapsed:6.2f}秒 (提速 {speedup:.2f}x)")
# 推荐配置
print("\n" + "="*60)
print("推荐配置")
print("="*60)
print("📊 根据测试结果:")
print(" • 并发 2-3:适合网络不稳定或资源受限场景")
print(" • 并发 5: 推荐配置,平衡性能和稳定性 ⭐")
print(" • 并发 8-10:追求极致速度,需要较好的网络和硬件")
print("\n当前 API 配置: 5 个并发 ✓")
if __name__ == "__main__":
asyncio.run(main())
"""
Gunicorn 配置文件(生产环境推荐)
使用方式:
gunicorn -c gunicorn_config.py api_server_optimized:app
或者:
gunicorn api_server_optimized:app \
--workers 4 \
--worker-class uvicorn.workers.UvicornWorker \
--bind 0.0.0.0:8106 \
--timeout 300
"""
import multiprocessing
import os
# ==========================================
# 服务器配置
# ==========================================
# 绑定地址和端口
bind = "0.0.0.0:8106"
# Worker 进程数
# 推荐:(2 x CPU核心数) + 1
# 对于爬虫服务,建议不要设置太多(因为每个 worker 都会有自己的并发池)
workers = int(os.getenv("WORKERS", "2")) # 默认 2 个 worker
# Worker 类(必须使用 UvicornWorker 以支持 ASGI/异步)
worker_class = "uvicorn.workers.UvicornWorker"
# 每个 worker 的线程数(对于异步应用,通常设为 1)
threads = 1
# ==========================================
# Worker 进程管理
# ==========================================
# Worker 超时时间(秒)
# 如果 worker 在这个时间内没有响应,会被强制重启
# 对于爬虫任务,需要设置较长的超时时间
timeout = 300 # 5 分钟
# 优雅重启超时时间(秒)
graceful_timeout = 30
# Worker 静默超时时间(秒)
# Worker 在这段时间内没有处理任何请求会被认为已卡死
keepalive = 5
# ==========================================
# 性能调优
# ==========================================
# 最大请求数(处理这么多请求后 worker 会重启,防止内存泄漏)
max_requests = 1000
max_requests_jitter = 50 # 随机抖动,避免所有 worker 同时重启
# 工作目录
chdir = os.path.dirname(os.path.abspath(__file__))
# ==========================================
# 日志配置
# ==========================================
# 日志级别
loglevel = "info"
# 访问日志格式
accesslog = "-" # 输出到 stdout
access_log_format = '%(h)s %(l)s %(u)s %(t)s "%(r)s" %(s)s %(b)s "%(f)s" "%(a)s" %(D)s'
# 错误日志
errorlog = "-" # 输出到 stderr
# 捕获输出(捕获 print 和标准输出)
capture_output = True
# ==========================================
# 进程命名
# ==========================================
proc_name = "crawler_api"
# ==========================================
# 服务器机制
# ==========================================
# 守护进程(后台运行,生产环境建议用 systemd/supervisor 管理)
daemon = False
# PID 文件
pidfile = "/tmp/crawler_api.pid"
# 用户和组(如果需要降权运行)
# user = "nobody"
# group = "nogroup"
# ==========================================
# 服务器钩子(可选)
# ==========================================
def on_starting(server):
"""服务器启动前"""
print(f"🚀 Gunicorn 正在启动...")
print(f" Workers: {workers}")
print(f" Bind: {bind}")
print(f" Worker Class: {worker_class}")
def on_reload(server):
"""服务器重载时"""
print("🔄 Gunicorn 正在重新加载...")
def when_ready(server):
"""服务器准备就绪"""
print(f"✅ Gunicorn 已启动完成")
print(f" 监听地址: http://{bind}")
print(f" API 文档: http://{bind.split(':')[0]}:{bind.split(':')[1]}/docs")
def on_exit(server):
"""服务器退出"""
print("👋 Gunicorn 正在关闭...")
def worker_int(worker):
"""Worker 收到 SIGINT 信号"""
print(f"⚠️ Worker {worker.pid} 收到中断信号")
def worker_abort(worker):
"""Worker 被强制终止"""
print(f"❌ Worker {worker.pid} 被强制终止")
# ==========================================
# SSL/TLS(如果需要 HTTPS)
# ==========================================
# keyfile = "/path/to/key.pem"
# certfile = "/path/to/cert.pem"
# ssl_version = "TLSv1_2"
# cert_reqs = 0 # ssl.CERT_NONE
# ca_certs = None
# ciphers = None
"""
使用 Playwright 异步 API 实现并发爬取
支持 2 个并发任务
"""
import asyncio
import time
from playwright.async_api import async_playwright
async def crawl_page(browser, url: str, index: int):
"""
爬取单个页面
Args:
browser: Playwright 浏览器实例
url: 要爬取的URL
index: 任务索引
Returns:
包含 URL、标题和内容的字典
"""
print(f"[任务 {index}] 开始爬取: {url}")
start_time = time.time()
try:
# 创建新页面(每个任务独立的页面)
page = await browser.new_page()
# 访问页面
await page.goto(url, wait_until="domcontentloaded", timeout=30000)
print(f"[任务 {index}] 页面已加载: {url}")
# 等待内容加载(可选)
try:
await page.wait_for_selector("body", timeout=5000)
except:
pass
# 提取标题
title = await page.title()
# 提取正文内容
# 尝试多种常见的内容选择器
content = ""
selectors = [
".article-content",
".article-body",
".content",
"article",
".post-content",
"main"
]
for selector in selectors:
try:
element = await page.query_selector(selector)
if element:
content = await element.inner_text()
if len(content) > 100: # 确保内容足够长
break
except:
continue
# 如果没找到特定容器,提取所有段落
if not content or len(content) < 100:
paragraphs = await page.query_selector_all("p")
texts = []
for p in paragraphs:
text = await p.inner_text()
if text.strip():
texts.append(text.strip())
content = "\n".join(texts)
elapsed = time.time() - start_time
print(f"[任务 {index}] ✓ 完成: {title[:50]}... (耗时 {elapsed:.2f}秒)")
await page.close()
return {
"url": url,
"title": title,
"content": content[:500] + "..." if len(content) > 500 else content,
"content_length": len(content),
"elapsed": elapsed
}
except Exception as e:
elapsed = time.time() - start_time
print(f"[任务 {index}] ✗ 失败: {url} - {e} (耗时 {elapsed:.2f}秒)")
return {
"url": url,
"error": str(e),
"elapsed": elapsed
}
async def crawl_batch(urls: list, max_concurrent: int = 2):
"""
并发爬取多个页面
Args:
urls: URL 列表
max_concurrent: 最大并发数(默认2)
Returns:
爬取结果列表
"""
print(f"\n{'='*60}")
print(f"开始批量爬取: {len(urls)} 个URL, 最大并发数: {max_concurrent}")
print(f"{'='*60}\n")
overall_start = time.time()
async with async_playwright() as p:
# 启动浏览器(一个浏览器实例可以有多个页面)
browser = await p.chromium.launch(
headless=True,
args=[
'--no-sandbox',
'--disable-setuid-sandbox',
'--disable-dev-shm-usage',
]
)
# 创建信号量来控制并发数
semaphore = asyncio.Semaphore(max_concurrent)
async def crawl_with_semaphore(url, index):
"""带信号量控制的爬取"""
async with semaphore:
return await crawl_page(browser, url, index)
# 创建所有任务
tasks = [
crawl_with_semaphore(url, i + 1)
for i, url in enumerate(urls)
]
# 并发执行所有任务
results = await asyncio.gather(*tasks, return_exceptions=True)
# 关闭浏览器
await browser.close()
overall_elapsed = time.time() - overall_start
# 统计结果
success_count = sum(1 for r in results if isinstance(r, dict) and "error" not in r)
failed_count = len(results) - success_count
print(f"\n{'='*60}")
print(f"批量爬取完成!")
print(f"总耗时: {overall_elapsed:.2f}秒")
print(f"成功: {success_count} 个")
print(f"失败: {failed_count} 个")
print(f"{'='*60}\n")
return results
async def main():
"""主函数"""
# 测试 URL 列表(可以替换为你想爬取的网址)
test_urls = [
"https://www.toutiao.com/article/7353226433517175305/",
"https://www.waytoagi.com/question/56972",
"https://docs.python.org/3/",
]
# 执行爬取(5个并发)
results = await crawl_batch(test_urls, max_concurrent=5)
# 打印详细结果
print("\n详细结果:")
print("=" * 60)
for i, result in enumerate(results, 1):
if isinstance(result, dict):
if "error" in result:
print(f"\n[{i}] ✗ {result['url']}")
print(f" 错误: {result['error']}")
else:
print(f"\n[{i}] ✓ {result['url']}")
print(f" 标题: {result['title']}")
print(f" 内容长度: {result['content_length']} 字符")
print(f" 耗时: {result['elapsed']:.2f}秒")
else:
print(f"\n[{i}] ✗ 异常: {result}")
if __name__ == "__main__":
# 运行异步主函数
asyncio.run(main())
fastapi==0.104.1
uvicorn[standard]==0.24.0
pydantic==2.5.0
requests==2.31.0
playwright==1.40.0
newspaper3k==0.2.8
beautifulsoup4==4.12.2
lxml==4.9.3
#!/bin/bash
# 重启爬虫 API 服务
echo "=========================================="
echo "重启爬虫 API 服务"
echo "=========================================="
CURRENT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
# 停止服务
bash "$CURRENT_DIR/stop.sh"
# 等待 2 秒
sleep 2
# 启动服务
bash "$CURRENT_DIR/start_background.sh"
#!/bin/bash
# 后台运行爬虫 API 服务
echo "=========================================="
echo "后台启动爬虫 API 服务"
echo "=========================================="
# 激活虚拟环境(如果有)
if [ -d "venv" ]; then
source venv/bin/activate
fi
# 获取当前目录
CURRENT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
LOG_DIR="$CURRENT_DIR/logs"
PID_FILE="$CURRENT_DIR/crawler_api.pid"
# 创建日志目录
mkdir -p "$LOG_DIR"
# 检查是否已经在运行
if [ -f "$PID_FILE" ]; then
OLD_PID=$(cat "$PID_FILE")
if ps -p "$OLD_PID" > /dev/null 2>&1; then
echo "⚠️ 服务已在运行 (PID: $OLD_PID)"
echo "如需重启,请先执行: ./stop.sh"
exit 1
else
echo "清理过期的 PID 文件..."
rm -f "$PID_FILE"
fi
fi
# Worker 数量
WORKERS=${WORKERS:-2}
echo "配置信息:"
echo " Workers: $WORKERS"
echo " 日志目录: $LOG_DIR"
echo " PID 文件: $PID_FILE"
echo ""
# 使用 nohup 后台运行
nohup gunicorn api_server_optimized:app \
-c gunicorn_config.py \
--workers $WORKERS \
--worker-class uvicorn.workers.UvicornWorker \
--bind 0.0.0.0:8106 \
--timeout 300 \
--pid "$PID_FILE" \
--access-logfile "$LOG_DIR/access.log" \
--error-logfile "$LOG_DIR/error.log" \
--log-level info \
> "$LOG_DIR/gunicorn.log" 2>&1 &
# 等待启动
sleep 2
# 检查是否启动成功
if [ -f "$PID_FILE" ]; then
PID=$(cat "$PID_FILE")
if ps -p "$PID" > /dev/null 2>&1; then
echo "✅ 服务已成功启动!"
echo " PID: $PID"
echo " API 地址: http://localhost:8106"
echo " API 文档: http://localhost:8106/docs"
echo ""
echo "查看日志:"
echo " 访问日志: tail -f $LOG_DIR/access.log"
echo " 错误日志: tail -f $LOG_DIR/error.log"
echo " 主日志: tail -f $LOG_DIR/gunicorn.log"
echo ""
echo "停止服务: ./stop.sh"
else
echo "❌ 服务启动失败,请查看日志:"
echo " cat $LOG_DIR/gunicorn.log"
exit 1
fi
else
echo "❌ 启动失败,未生成 PID 文件"
exit 1
fi
#!/bin/bash
# 开发环境启动脚本(使用 Uvicorn,支持热重载)
echo "=========================================="
echo "启动爬虫 API 服务(开发模式)"
echo "=========================================="
# 激活虚拟环境(如果有)
if [ -d "venv" ]; then
echo "激活虚拟环境..."
source venv/bin/activate
fi
echo "启动 Uvicorn 开发服务器(支持热重载)..."
echo "API 地址: http://localhost:8106"
echo "API 文档: http://localhost:8106/docs"
echo ""
# 启动 Uvicorn(开发模式,支持热重载)
exec python3 api_server_optimized.py
# 或者使用 uvicorn 命令行(支持更多选项)
# exec uvicorn api_server_optimized:app \
# --host 0.0.0.0 \
# --port 8106 \
# --reload \
# --log-level info
#!/bin/bash
# 生产环境启动脚本(使用 Gunicorn)
echo "=========================================="
echo "启动爬虫 API 服务(生产模式)"
echo "=========================================="
# 激活虚拟环境(如果有)
if [ -d "venv" ]; then
echo "激活虚拟环境..."
source venv/bin/activate
fi
# 检查 gunicorn 是否安装
if ! command -v gunicorn &> /dev/null; then
echo "❌ Gunicorn 未安装"
echo "安装命令: pip install gunicorn"
exit 1
fi
# 获取 CPU 核心数
CPU_CORES=$(nproc 2>/dev/null || sysctl -n hw.ncpu 2>/dev/null || echo 2)
WORKERS=${WORKERS:-2} # 默认 2 个 worker
echo "系统 CPU 核心数: $CPU_CORES"
echo "启动 Worker 数量: $WORKERS"
echo ""
# 启动 Gunicorn
exec gunicorn api_server_optimized:app \
-c gunicorn_config.py \
--workers $WORKERS \
--worker-class uvicorn.workers.UvicornWorker \
--bind 0.0.0.0:8106 \
--timeout 300 \
--access-logfile - \
--error-logfile - \
--log-level info
#!/bin/bash
# 查看爬虫 API 服务状态
echo "=========================================="
echo "爬虫 API 服务状态"
echo "=========================================="
CURRENT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
PID_FILE="$CURRENT_DIR/crawler_api.pid"
# 检查 PID 文件
if [ -f "$PID_FILE" ]; then
PID=$(cat "$PID_FILE")
echo "PID 文件: $PID_FILE"
echo "主进程 PID: $PID"
echo ""
# 检查进程是否存在
if ps -p "$PID" > /dev/null 2>&1; then
echo "✅ 服务运行中"
echo ""
echo "进程信息:"
ps aux | head -1
ps aux | grep -E "($PID|gunicorn.*api_server_optimized)" | grep -v grep
echo ""
# 检查端口
echo "端口监听:"
netstat -tlnp 2>/dev/null | grep 8106 || ss -tlnp 2>/dev/null | grep 8106 || echo " 无法获取端口信息"
echo ""
# 测试 API 连接
echo "API 健康检查:"
if command -v curl &> /dev/null; then
RESPONSE=$(curl -s -o /dev/null -w "%{http_code}" http://localhost:8106/health 2>/dev/null)
if [ "$RESPONSE" = "200" ]; then
echo " ✅ API 响应正常 (HTTP $RESPONSE)"
curl -s http://localhost:8106/health | python3 -m json.tool 2>/dev/null || echo ""
else
echo " ⚠️ API 未响应或异常 (HTTP $RESPONSE)"
fi
else
echo " (curl 未安装,跳过检查)"
fi
else
echo "❌ 服务未运行(PID 文件存在但进程不存在)"
echo " 建议清理: rm -f $PID_FILE"
fi
else
echo "⚠️ PID 文件不存在"
# 尝试查找相关进程
PIDS=$(pgrep -f "gunicorn.*api_server_optimized")
if [ -n "$PIDS" ]; then
echo ""
echo "发现相关进程(可能未通过脚本启动):"
ps aux | head -1
ps aux | grep -E "gunicorn.*api_server_optimized" | grep -v grep
else
echo "❌ 服务未运行"
fi
fi
echo ""
echo "=========================================="
echo "快捷命令:"
echo " 启动: ./start_background.sh"
echo " 停止: ./stop.sh"
echo " 重启: ./restart.sh"
echo " 查看日志: tail -f logs/gunicorn.log"
echo "=========================================="
#!/bin/bash
# 停止后台运行的爬虫 API 服务
echo "=========================================="
echo "停止爬虫 API 服务"
echo "=========================================="
CURRENT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
PID_FILE="$CURRENT_DIR/crawler_api.pid"
# 检查 PID 文件是否存在
if [ ! -f "$PID_FILE" ]; then
echo "⚠️ PID 文件不存在,服务可能未运行"
# 尝试查找进程
PIDS=$(pgrep -f "gunicorn.*api_server_optimized")
if [ -n "$PIDS" ]; then
echo "发现相关进程:"
ps aux | grep -E "gunicorn.*api_server_optimized" | grep -v grep
echo ""
read -p "是否停止这些进程?(y/n) " -n 1 -r
echo
if [[ $REPLY =~ ^[Yy]$ ]]; then
echo "$PIDS" | xargs kill -TERM
echo "✅ 已发送停止信号"
fi
else
echo "未发现运行中的服务"
fi
exit 0
fi
# 读取 PID
PID=$(cat "$PID_FILE")
# 检查进程是否存在
if ! ps -p "$PID" > /dev/null 2>&1; then
echo "⚠️ 进程 $PID 不存在,清理 PID 文件..."
rm -f "$PID_FILE"
exit 0
fi
echo "正在停止服务 (PID: $PID)..."
# 发送 TERM 信号(优雅关闭)
kill -TERM "$PID"
# 等待进程结束
TIMEOUT=30
for i in $(seq 1 $TIMEOUT); do
if ! ps -p "$PID" > /dev/null 2>&1; then
echo "✅ 服务已停止"
rm -f "$PID_FILE"
exit 0
fi
echo -n "."
sleep 1
done
echo ""
echo "⚠️ 进程未在 $TIMEOUT 秒内停止,发送 KILL 信号..."
kill -KILL "$PID"
sleep 1
if ! ps -p "$PID" > /dev/null 2>&1; then
echo "✅ 服务已强制停止"
rm -f "$PID_FILE"
else
echo "❌ 无法停止进程 $PID"
exit 1
fi
[
{
"url": "http://www.chnmodel.com/zcjh/moq/2018-07-23/5501.html",
"title": "大视界模型有限公司--中国模型网",
"text": "当前位置: 首页 > 模型分享 > 沙盘模型公司 » 正文\n\n大视界模型有限公司 是国内目前最专业的模型设计制作公司之一,是由全国模型行业中众多有名的资深模型精英组成的一支经验丰富、技术精湛的专业模型队伍。 公司创立以来,专注于提供建筑模型、工业模型、机械模型、水利模型、电力模型、展馆模型、规划模型、方案模型、车辆模型、军事模型、木质模型、竹质模型、铜质模型、合金模型、玻璃钢模型、园林景观模型、山体地形模型、雕塑模型、浮雕模型、仿真植物模型、仿真生物模型、仿真动物模型、仿真静态模型、仿真动态模型、场景复原模型、多媒体数字模型、多媒体升降模型、多媒体互动模型、声光影模型、虚拟立体动画模型等领域的模型设计、模型制作服务及精品模型技术开发。经过多年的努力发展,目前是中国模型行业中技术实力雄厚、发展较快、影响力强的专业模..\n\n全国模型价格免费在线查询系统"
},
{
"url": "http://caifuhao.eastmoney.com/news/20240225070209973371220",
"title": "在中国境内,已知公开的大型模型公司主要包括以下几个:",
"text": "在中国境内,已知公开的大型模型公司主要包括以下几个:\n1.**华为**:华为是一家全球领先的信息与通信技术(ICT)解决方案提供商,也积极参与大模型技术的研发。\n2.**百度**:百度推出的大模型产品包括“文心一言”,这是一款基于大语言模型的生成式AI产品。\n3.**阿里巴巴**:阿里巴巴也是大模型技术的重要参与者之一。\n4.**腾讯**:腾讯在大模型领域也有所布局,是其人工智能战略的一部分。\n5.**字节跳动**:字节跳动同样在大模型技术方面有所发展。\n6.**京东**:京东作为电商巨头,也在探索大模型技术的应用。\n7.**科大讯飞**:科大讯飞主要专注于语音识别和人工智能领域,大模型技术也是其研究的一部分。\n8.**商汤科技**:商汤科技是一家专注于计算机视觉和深度学习技术的公司,也在大模型领域有所发展。\n9.**360公司**:360公司也在大模型技术方面有所探索。\n10.**实在智能**:实在智能是一家专注于人工智能和大模型技术的公司。\n11.**中国科学院自动化研究所**:作为一家研究机构,中国科学院自动化研究所也在大模型领域有所贡献。\n12.**智谱AI**:智谱AI是一家专注于大模型技术的创业公司,已经开发出了多代大模型产品。\n这些公司代表了中国在大模型技术方面的主要力量,涵盖了从互联网巨头到初创公司的不同类型企业。随着人工智能技术的不断发展,这些公司在未来的表现值得期待。"
},
{
"url": "https://caifuhao.eastmoney.com/news/20240225085325506722330",
"title": "中国境内大模型公司排名,你认同吗?",
"text": "在中国境内,已知公开的大型模型公司主要包括以下几个:\n1.**华为**:华为是一家全球领先的信息与通信技术(ICT)解决方案提供商,也积极参与大模型技术的研发。\n2.**百度**:百度推出的大模型产品包括“文心一言”,这是一款基于大语言模型的生成式AI产品。\n3.**阿里巴巴**:阿里巴巴也是大模型技术的重要参与者之一。\n4.**腾讯**:腾讯在大模型领域也有所布局,是其人工智能战略的一部分。\n5.**字节跳动**:字节跳动同样在大模型技术方面有所发展。\n6.**京东**:京东作为电商巨头,也在探索大模型技术的应用。\n7.**科大讯飞**:科大讯飞主要专注于语音识别和人工智能领域,大模型技术也是其研究的一部分。\n8.**商汤科技**:商汤科技是一家专注于计算机视觉和深度学习技术的公司,也在大模型领域有所发展。\n9.**360公司**:360公司也在大模型技术方面有所探索。\n10.**实在智能**:实在智能是一家专注于人工智能和大模型技术的公司。\n11.**中国科学院自动化研究所**:作为一家研究机构,中国科学院自动化研究所也在大模型领域有所贡献。\n12.**智谱AI**:智谱AI是一家专注于大模型技术的创业公司,已经开发出了多代大模型产品。\n这些公司代表了中国在大模型技术方面的主要力量,涵盖了从互联网巨头到初创公司的不同类型企业。随着人工智能技术的不断发展,这些公司在未来的表现值得期待。\n郑重声明:用户在财富号/股吧/博客社区发表的所有信息(包括但不限于文字、视频、音频、数据及图表)仅代表个人观点,与本网站立场无关,不对您构成任何投资建议,据此操作风险自担。请勿相信代客理财、免费荐股和炒股培训等宣传内容,远离非法证券活动。请勿添加发言用户的手机号码、公众号、、微信及QQ等信息,谨防上当受骗!\n全部评论 (3)\n最新\n股友wrMtDD\n来自:广东\n小米金山呢?\n26分钟前\n回复\n静谧的慕容京城1\n来自:河北\n华宇:法律大模型\n37分钟前\n回复\n科技前沿报道\n来自:江苏\n股价最便宜的!佳都科技排名第29名!股价不到六块!\n39分钟前\n回复\n写评论 ...\n13\n3"
},
{
"url": "https://www.toutiao.com/article/7353226433517175305/",
"title": "国内AI大模型厂商混战:谁是“顶流”第一阵营?",
"text": "据不完全统计中国大模型厂商数量目前约有200家,其中,通用及金融垂直领域大模型落地最快,北京、上海、深圳推出大模型的数量稳居前三。\n\n大模型厂商类型方面,国内互联网科技公司纷纷入局,包括百度、阿里、腾讯、华为等大厂,科大讯飞、商汤科技、旷视科技等垂直于AI领域的厂商,以及智谱华章、百川智能、月之暗面等大模型初创企业,还有金融、汽车、教育、智能家居、消费电子等垂直行业企业也基于垂直领域人工智能技术和数据积累等能力,自主研发大模型。\n\n此外,高校、研究机构也是大模型研发的重要力量,约有近四分之一的大模型由高校、研究机构发布,中国科学院下属多个院所、北京大学、清华大学、复旦大学、浙江大学等高校都发布了自己的大模型。\n\n大模型具有广泛的应用场景,以下是一些常见的场景:\n\n\n\n1. 自然语言处理:如文本生成、机器翻译、问答系统等。\n\n2. 语音识别与合成:实现语音交互和语音合成。\n\n3. 图像识别与处理:包括图像分类、目标检测等。\n\n4. 智能客服:提供在线客服支持。\n\n5. 智能推荐系统:根据用户偏好进行内容推荐。\n\n6. 情感分析:了解文本的情感倾向。\n\n7. 自动化决策:在金融、医疗等领域辅助决策。\n\n8. 智能写作:辅助文章创作、文案撰写等。\n\n9. 智能助手:如智能家居助手。\n\n10. 金融风险评估:分析信用风险等。\n\n11. 医疗诊断辅助:帮助医生进行诊断。\n\n12. 工业自动化:用于工厂的智能控制和监测。\n\n13. 教育领域:个性化学习、辅导等。\n\n14. 虚拟助手:提供各种信息和服务。\n\n15. 市场预测:分析市场趋势和预测需求。\n\n国内知名厂商大模型阵营分析:\n\n大模型产品 出品厂商 一、顶流梯队:\n\n文心一言 北京百度网讯科技有限公司 智谱清言(ChatGLM) 北京智谱华章科技有限公司 云雀大模型 北京抖音信息服务有限公司 百应 北京百川智能科技有限公司 紫东太初大模型开放平台 中国科学院自动化研究所 日日新 上海商汤智能科技有限公司 星火认知大模型 科大讯飞股份有限公司 360智脑大模型 三六零科技集团有限公司 通义千问大模型 阿里巴巴达摩院(杭州)科技有限公司 腾讯混元助手大模型 深圳市腾讯计算机系统有限公司 华为云盘古NLP大模型 华为云计算技术有限公司 序列猴子 出门问问信息科技有限公司 Moonshot 北京月之暗面科技有限公司 “天工”大模型 昆仑万维科技股份有限公司 WPS AI 北京金山办公软件股份有限公司 奇元大模型 北京奇虎科技有限公司 美团大模型“通慧” 北京三快科技有限公司 子曰 北京网易有道计算机系统有限公司 AntGLM 蚂蚁金服(杭州)网络技术有限公司 网易邮箱智能助手大模型 广州网易计算机系统有限公司 二、王者梯队:\n\n无涯(Infinity) 星环信息科技(上海)股份有限公司 东风(DFM)语言大模型 思必驰科技股份有限公司 CVTE大模型 广州视源电子科技股份有限公司 浪潮海若大模型 浪潮云信息技术股份公司 言犀 京东科技信息技术有限公司 福禄瓜 北京字跳网络技术有限公司 雅意大模型 北京中科闻歌科技股份有限公司 山海认知大模型 云知声智能科技股份有限公司 快意大模型 北京快手科技有限公司 易生诸相 网易(上海)网络游戏有限公司 朝彻大模型 广州唯品会数据科技有限公司 云从从容大模型 云从科技集团股份有限公司 小爱同学AI助手 小米科技有限责任公司 脉脉APP 智能问答功能 北京淘友天下科技发展有限公司 智联招聘APP--AI改简历新功能 北京网聘信息技术有限公司 阅爱聊小程序 掌阅科技股份有限公司 GiantGPT 上海巨人网络科技有限公司 蜜巢 上海蜜度蜜巢智能科技有限公司 松鼠Ai教育大模型 上海松鼠云上人工智能技术有限公司 九天自然语言交互大模型 中国移动通信有限公司 星辰大模型 淘宝(中国)软件有限公司 百业灵犀大模型(LinSeer) 新华三技术有限公司 佳都知行大模型 佳都科技集团股份有限公司 天禧大模型 联想(北京)有限公司 爱奇艺AI对话产品 北京爱奇艺科技有限公司 贝壳梦想家大模型 贝壳找房(北京)科技有限公司 天地大模型 汉王科技股份有限公司 滴滴出行大模型 滴滴出行(北京)网络平台技术有限公司 AndesGPT-LVM OPPO广东移动通信有限公司 印象大语言模型 北京印象笔记科技有限公司\n\n\n\n\n\n免责声明:本号对所有原创、转载文章陈述与观点均保持中立,内容仅供读者学习和交流。文章、图片等版权归原作者享有,如有侵权,请留言联系更正或删除。如有不足之处请批评指正,谢谢。"
},
{
"url": "https://www.waytoagi.com/question/56972",
"title": "中国头部的大模型公司分别是哪几家?",
"text": "Home / Question List / Question Detail\n\n中国头部的大模型公司分别是哪几家? Answer 以下是中国头部的大模型公司: 北京: 百度(文心一言):https://wenxin.baidu.com 抖音(云雀大模型):https://www.doubao.com 智谱 AI(GLM 大模型):https://chatglm.cn 中科院(紫东太初大模型):https://xihe.mindspore.cn 百川智能(百川大模型):https://www.baichuan-ai.com/\n\n上海: 商汤(日日新大模型):https://www.sensetime.com/ MiniMax(ABAB 大模型):https://api.minimax.chat 上海人工智能实验室(书生通用大模型):https://intern-ai.org.cn\n\n此外,字节和腾讯都有团队在和外部大模型合作。创业公司中表现出色的有: 智谱:一年间推出了 4 代 GLM,一直是国内能力最好的模型之一。\n\nMiniMax:推出了 MoE 架构的新模型,和“星野”这个目前国内最成功的 AI 陪聊 APP。\n\n月之暗面:专注长 Token 能力,在记忆力和长 Token 能力上可圈可点。 通过 SuperCLUE 测评结果发现,文心一言 4.0、通义千问 2.0 引领国内大模型的研发进度,部分高质量大模型也在闭源应用和开源生态中形成自己独特的优势。从国内 TOP19 大模型的数量来看,创业公司和大厂的占比几乎持平。 Content generated by AI large model, please carefully verify (powered by aily) References 8月正式上线的国内大模型 5⃣️五家北京企业机构:百度(文心一言)https://wenxin.baidu.com抖音(云雀大模型)https://www.doubao.com智谱AI(GLM大模型)https://chatglm.cn中科院(紫东太初大模型)https://xihe.mindspore.cn百川智能(百川大模型)https://www.baichuan-ai.com/3⃣️三家上海企业机构:商汤(日日新大模型)https://www.sensetime.com/MiniMax(ABAB大模型)https://api.minimax.chat上海人工智能实验室(书生通用大模型)https://intern-ai.org.cn今天这8个大模型,在聊天状态下——能生成Markdown格式的:智谱清言、商量Sensechat、MiniMax目前不能进行自然语言交流的:昇思(可以对文本进行是否由AI生成的检测,类似论文查重,准确度不错)、书生受限制使用:MiniMax(无法对生成的文本进行复制输出,且只有15元的预充值额度进行体验,完成企业认证后可以进行充值)特色功能:昇思——生图,MiniMax——语音合成 AGI万字长文(上)| 2023回顾与反思 另外,字节和腾讯都有团队在和外部大模型合作,很多外服务和产品也不是用的自己的模型。创业公司。目前明确看到有好模型、好产品的第一梯队公司大概如下:智谱:一年间推出了4代GLM,一直是国内能力最好的模型之一MiniMax:推出了MoE架构的新模型,和”星野“这个目前国内最成功的AI陪聊APP月之暗面:专注长Token能力,在记忆力和长Token能力上可圈可点 2023年度中文大模型基准测评报告.pdf [title]VIRTUAL中文大模型基准测评2023年度报告[heading2]国内外大模型总体表现[heading3]国内大模型竞争格局国内大模型综合表现-SuperCLUE通过SuperCLUE测评结果发现,国内大模型的第一梯队有了更多新的模型加入。头部模型如文心一言4.0、通义千问2.0引领国内大模型的研发进度,部分高质量大模型紧追不舍,分别在闭源应用和开源生态中形成自己独特的优势。创业公司(9)vs大厂(10)创业公司大厂•从国内TOP19大模型的数量来看,创业公司和大厂的占比几乎持平。大厂和创业公司平均成绩对比\n\nAsk Again\n\nOthers are asking"
},
{
"url": "http://www.chnmodel.com/qzgl/2018-07-18/4355.html",
"title": "中国模型网",
"text": "沙盘模型做好了怎样验收比较合理\n售楼处沙盘模型设计制作细节\n楼盘沙盘模型尺寸如何定\n沙盘模型后期维护及技术支持\n如何制作售楼处展示沙盘模型\n一套好的沙盘模型是怎样制作出来的\n14410"
},
{
"url": "https://max.book118.com/html/2024/0125/6000210030010041.shtm",
"title": "中国大模型产业研究报告.pdf",
"text": "202309\n年月\n中国大模型\n目录\n一、产业图谱3\n二、产业总览4\n(一)企业总量4\n(二)企业结构4\n三、创新活力8\n(一)授权专利8\n(二)参研标准9\n(三)研发机构10\n四、股权融资11\n(一)总量规模11\n(二)地域分布13\n(三)融资轮次14\n(四)投资机构15\n1\n2\n一、产业图谱\n大模型产业由基础层、模型层、中间层、应用层4个一级环节构成,\n含19个二级环节、47个三级环节。\n图1大模型产业链\n3\n二、产业总览\n(一)企业总量\n截至2023年08月,全国大模型存量企业78,208家。其中2022年\n有存量企业66,293家,同比增加28.59%。\n图22018年至2023年08月全国大模型存量企业数量变化趋势\n(二)企业结构\n1.产业金字塔\n截至2023年08月,全国大模型有上市企业533家,占比0.68%;\n专精特新企业592家,占比0.76%;国家高新技术企业8,518家,占比\n10.89%。\n图3截至2023年08月全国大模型企业金字塔\n4\n2.注册资本\n截至2023年08月,全国大模型注册资金在100万-500万的企业最\n多,有28,532家,占比为36.48%;其次是注册资金在1000万-5000万\n的企业,有19,234家,占比为24.59%。\n图4截至2023年08月全国大模型企业注册资本分布\n5\n3.地域分布\n截至2023年08月,全国大模型企业数量广东省、北京市和江苏省\n位列前三,分别有20,191、8,152和5,637家。\n图5截至2023年08月全国大模型企业区域分布\n6\n表2截至2023年08月全国大模型重点企业区域分布\n排行区域企业链主企业上市企业专精特新高新技术\n1广东省20191422883511459\n2北京市815210341459232120\n3江苏省563714050113653\n4上海市5251666596391066\n5浙江省441120251164586\n6山东省4398137"
}
]
\ No newline at end of file
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论