Appearance
🚀 Python 实践项目
项目概述
通过实际项目来巩固和运用所学的 Python 知识。这些项目涵盖了从基础到进阶的各种应用场景,帮助你提升编程技能和解决实际问题的能力。
💡 学习建议: - 从简单项目开始,逐步增加复杂度 - 注重代码质量和最佳实践 - 多思考项目的扩展性和优化 - 记录学习过程和遇到的问题
🎯 项目1:个人任务管理器
项目描述
创建一个命令行任务管理器,支持添加、删除、查看和完成任务。
功能需求
- 添加新任务
- 标记任务完成
- 删除任务
- 查看所有任务
- 按状态筛选任务
- 数据持久化
实现代码
python
import json
import os
from datetime import datetime
from typing import List, Dict
class Task:
"""任务类"""
def __init__(self, title: str, description: str = ""):
self.id = datetime.now().strftime("%Y%m%d%H%M%S")
self.title = title
self.description = description
self.created_at = datetime.now().isoformat()
self.completed = False
self.completed_at = None
def complete(self):
"""标记任务完成"""
self.completed = True
self.completed_at = datetime.now().isoformat()
def to_dict(self):
"""转换为字典"""
return {
"id": self.id,
"title": self.title,
"description": self.description,
"created_at": self.created_at,
"completed": self.completed,
"completed_at": self.completed_at
}
@classmethod
def from_dict(cls, data: Dict):
"""从字典创建任务"""
task = cls(data["title"], data["description"])
task.id = data["id"]
task.created_at = data["created_at"]
task.completed = data["completed"]
task.completed_at = data.get("completed_at")
return task
class TaskManager:
"""任务管理器"""
def __init__(self, data_file: str = "tasks.json"):
self.data_file = data_file
self.tasks: List[Task] = []
self.load_tasks()
def load_tasks(self):
"""从文件加载任务"""
try:
if os.path.exists(self.data_file):
with open(self.data_file, 'r', encoding='utf-8') as f:
data = json.load(f)
self.tasks = [Task.from_dict(task_data) for task_data in data]
except Exception as e:
print(f"加载任务时发生错误: {e}")
self.tasks = []
def save_tasks(self):
"""保存任务到文件"""
try:
data = [task.to_dict() for task in self.tasks]
with open(self.data_file, 'w', encoding='utf-8') as f:
json.dump(data, f, ensure_ascii=False, indent=2)
except Exception as e:
print(f"保存任务时发生错误: {e}")
def add_task(self, title: str, description: str = ""):
"""添加新任务"""
task = Task(title, description)
self.tasks.append(task)
self.save_tasks()
print(f"任务 '{title}' 已添加")
def complete_task(self, task_id: str):
"""完成任务"""
for task in self.tasks:
if task.id == task_id:
task.complete()
self.save_tasks()
print(f"任务 '{task.title}' 已完成")
return True
print("任务未找到")
return False
def delete_task(self, task_id: str):
"""删除任务"""
for i, task in enumerate(self.tasks):
if task.id == task_id:
deleted_task = self.tasks.pop(i)
self.save_tasks()
print(f"任务 '{deleted_task.title}' 已删除")
return True
print("任务未找到")
return False
def list_tasks(self, show_completed: bool = True):
"""列出任务"""
if not self.tasks:
print("没有任务")
return
print(f"\n{'='*50}")
print("任务列表")
print(f"{'='*50}")
for task in self.tasks:
if not show_completed and task.completed:
continue
status = "✅ 已完成" if task.completed else "⏳ 进行中"
print(f"\nID: {task.id}")
print(f"标题: {task.title}")
if task.description:
print(f"描述: {task.description}")
print(f"状态: {status}")
print(f"创建时间: {task.created_at}")
if task.completed_at:
print(f"完成时间: {task.completed_at}")
print("-" * 30)
def get_stats(self):
"""获取任务统计"""
total = len(self.tasks)
completed = sum(1 for task in self.tasks if task.completed)
pending = total - completed
print(f"\n任务统计:")
print(f"总任务数: {total}")
print(f"已完成: {completed}")
print(f"待完成: {pending}")
if total > 0:
completion_rate = (completed / total) * 100
print(f"完成率: {completion_rate:.1f}%")
def main():
"""主程序"""
manager = TaskManager()
while True:
print("\n" + "="*50)
print("个人任务管理器")
print("="*50)
print("1. 添加任务")
print("2. 完成任务")
print("3. 删除任务")
print("4. 查看所有任务")
print("5. 查看待完成任务")
print("6. 查看任务统计")
print("0. 退出")
choice = input("\n请选择操作 (0-6): ").strip()
if choice == "1":
title = input("请输入任务标题: ").strip()
description = input("请输入任务描述 (可选): ").strip()
if title:
manager.add_task(title, description)
else:
print("任务标题不能为空")
elif choice == "2":
manager.list_tasks()
task_id = input("请输入要完成的任务ID: ").strip()
if task_id:
manager.complete_task(task_id)
elif choice == "3":
manager.list_tasks()
task_id = input("请输入要删除的任务ID: ").strip()
if task_id:
confirm = input("确认删除? (y/N): ").strip().lower()
if confirm == 'y':
manager.delete_task(task_id)
elif choice == "4":
manager.list_tasks()
elif choice == "5":
manager.list_tasks(show_completed=False)
elif choice == "6":
manager.get_stats()
elif choice == "0":
print("感谢使用任务管理器!")
break
else:
print("无效选择,请重试")
if __name__ == "__main__":
main()🎯 项目2:学生成绩管理系统
项目描述
创建一个学生成绩管理系统,支持学生信息管理和成绩统计。
功能需求
- 添加学生信息
- 录入学生成绩
- 计算平均分和排名
- 生成成绩报告
- 数据导入导出
实现代码
python
import csv
import json
from typing import List, Dict, Optional
from dataclasses import dataclass, asdict
@dataclass
class Student:
"""学生类"""
student_id: str
name: str
age: int
class_name: str
scores: Dict[str, float] = None
def __post_init__(self):
if self.scores is None:
self.scores = {}
def add_score(self, subject: str, score: float):
"""添加成绩"""
if 0 <= score <= 100:
self.scores[subject] = score
return True
return False
def get_average_score(self) -> float:
"""计算平均分"""
if not self.scores:
return 0.0
return sum(self.scores.values()) / len(self.scores)
def get_total_score(self) -> float:
"""计算总分"""
return sum(self.scores.values())
class GradeManager:
"""成绩管理器"""
def __init__(self):
self.students: List[Student] = []
self.subjects = ["语文", "数学", "英语", "物理", "化学"]
def add_student(self, student_id: str, name: str, age: int, class_name: str):
"""添加学生"""
# 检查学号是否已存在
if any(s.student_id == student_id for s in self.students):
print(f"学号 {student_id} 已存在")
return False
student = Student(student_id, name, age, class_name)
self.students.append(student)
print(f"学生 {name} 添加成功")
return True
def input_scores(self, student_id: str):
"""录入成绩"""
student = self.find_student(student_id)
if not student:
print("学生不存在")
return False
print(f"为 {student.name} 录入成绩:")
for subject in self.subjects:
while True:
try:
score = float(input(f"{subject} 成绩 (0-100): "))
if student.add_score(subject, score):
break
else:
print("成绩必须在 0-100 之间")
except ValueError:
print("请输入有效数字")
print("成绩录入完成")
return True
def find_student(self, student_id: str) -> Optional[Student]:
"""查找学生"""
for student in self.students:
if student.student_id == student_id:
return student
return None
def calculate_rankings(self) -> List[Student]:
"""计算排名"""
return sorted(self.students, key=lambda s: s.get_average_score(), reverse=True)
def generate_report(self):
"""生成成绩报告"""
if not self.students:
print("没有学生数据")
return
print("\n" + "="*80)
print("学生成绩报告")
print("="*80)
rankings = self.calculate_rankings()
print(f"{'排名':<4} {'学号':<12} {'姓名':<8} {'班级':<10} {'平均分':<8} {'总分':<8}")
print("-" * 80)
for i, student in enumerate(rankings, 1):
avg_score = student.get_average_score()
total_score = student.get_total_score()
print(f"{i:<4} {student.student_id:<12} {student.name:<8} "
f"{student.class_name:<10} {avg_score:<8.1f} {total_score:<8.1f}")
# 统计信息
print("\n" + "="*50)
print("班级统计")
print("="*50)
class_stats = {}
for student in self.students:
class_name = student.class_name
if class_name not in class_stats:
class_stats[class_name] = []
class_stats[class_name].append(student.get_average_score())
for class_name, scores in class_stats.items():
avg_score = sum(scores) / len(scores)
print(f"{class_name}: 平均分 {avg_score:.1f}, 学生数 {len(scores)}")
def export_to_csv(self, filename: str):
"""导出到CSV文件"""
try:
with open(filename, 'w', newline='', encoding='utf-8') as f:
writer = csv.writer(f)
# 写入标题行
headers = ["学号", "姓名", "年龄", "班级"] + self.subjects + ["平均分", "总分"]
writer.writerow(headers)
# 写入数据
for student in self.students:
row = [
student.student_id,
student.name,
student.age,
student.class_name
]
# 添加各科成绩
for subject in self.subjects:
score = student.scores.get(subject, 0)
row.append(score)
# 添加平均分和总分
row.append(student.get_average_score())
row.append(student.get_total_score())
writer.writerow(row)
print(f"数据已导出到 {filename}")
return True
except Exception as e:
print(f"导出失败: {e}")
return False
def import_from_csv(self, filename: str):
"""从CSV文件导入"""
try:
with open(filename, 'r', encoding='utf-8') as f:
reader = csv.DictReader(f)
for row in reader:
student_id = row["学号"]
name = row["姓名"]
age = int(row["年龄"])
class_name = row["班级"]
student = Student(student_id, name, age, class_name)
# 导入成绩
for subject in self.subjects:
if subject in row and row[subject]:
score = float(row[subject])
student.add_score(subject, score)
self.students.append(student)
print(f"从 {filename} 导入 {len(self.students)} 个学生")
return True
except Exception as e:
print(f"导入失败: {e}")
return False
def main():
"""主程序"""
manager = GradeManager()
while True:
print("\n" + "="*50)
print("学生成绩管理系统")
print("="*50)
print("1. 添加学生")
print("2. 录入成绩")
print("3. 查看学生信息")
print("4. 生成成绩报告")
print("5. 导出数据")
print("6. 导入数据")
print("0. 退出")
choice = input("\n请选择操作 (0-6): ").strip()
if choice == "1":
student_id = input("学号: ").strip()
name = input("姓名: ").strip()
age = int(input("年龄: "))
class_name = input("班级: ").strip()
manager.add_student(student_id, name, age, class_name)
elif choice == "2":
student_id = input("请输入学号: ").strip()
manager.input_scores(student_id)
elif choice == "3":
student_id = input("请输入学号: ").strip()
student = manager.find_student(student_id)
if student:
print(f"\n学生信息:")
print(f"学号: {student.student_id}")
print(f"姓名: {student.name}")
print(f"年龄: {student.age}")
print(f"班级: {student.class_name}")
print(f"成绩: {student.scores}")
print(f"平均分: {student.get_average_score():.1f}")
else:
print("学生不存在")
elif choice == "4":
manager.generate_report()
elif choice == "5":
filename = input("导出文件名 (默认: students.csv): ").strip()
if not filename:
filename = "students.csv"
manager.export_to_csv(filename)
elif choice == "6":
filename = input("导入文件名: ").strip()
if filename:
manager.import_from_csv(filename)
elif choice == "0":
print("感谢使用成绩管理系统!")
break
else:
print("无效选择,请重试")
if __name__ == "__main__":
main()🎯 项目3:简单博客系统
项目描述
创建一个简单的博客系统,支持文章的发布、编辑和查看。
功能需求
- 发布新文章
- 编辑现有文章
- 查看文章列表
- 搜索文章
- 文章分类管理
实现代码
python
import json
import os
from datetime import datetime
from typing import List, Dict, Optional
from dataclasses import dataclass, asdict
@dataclass
class Article:
"""文章类"""
id: str
title: str
content: str
author: str
category: str
tags: List[str]
created_at: str
updated_at: str
published: bool = False
def update(self, title: str = None, content: str = None,
category: str = None, tags: List[str] = None):
"""更新文章"""
if title:
self.title = title
if content:
self.content = content
if category:
self.category = category
if tags:
self.tags = tags
self.updated_at = datetime.now().isoformat()
def publish(self):
"""发布文章"""
self.published = True
self.updated_at = datetime.now().isoformat()
def unpublish(self):
"""取消发布"""
self.published = False
self.updated_at = datetime.now().isoformat()
class BlogSystem:
"""博客系统"""
def __init__(self, data_file: str = "blog_data.json"):
self.data_file = data_file
self.articles: List[Article] = []
self.categories = ["技术", "生活", "学习", "其他"]
self.load_data()
def load_data(self):
"""加载数据"""
try:
if os.path.exists(self.data_file):
with open(self.data_file, 'r', encoding='utf-8') as f:
data = json.load(f)
self.articles = [Article(**article_data) for article_data in data.get("articles", [])]
self.categories = data.get("categories", self.categories)
except Exception as e:
print(f"加载数据时发生错误: {e}")
self.articles = []
def save_data(self):
"""保存数据"""
try:
data = {
"articles": [asdict(article) for article in self.articles],
"categories": self.categories
}
with open(self.data_file, 'w', encoding='utf-8') as f:
json.dump(data, f, ensure_ascii=False, indent=2)
except Exception as e:
print(f"保存数据时发生错误: {e}")
def create_article(self, title: str, content: str, author: str,
category: str, tags: List[str]):
"""创建新文章"""
article_id = datetime.now().strftime("%Y%m%d%H%M%S")
article = Article(
id=article_id,
title=title,
content=content,
author=author,
category=category,
tags=tags,
created_at=datetime.now().isoformat(),
updated_at=datetime.now().isoformat()
)
self.articles.append(article)
self.save_data()
print(f"文章 '{title}' 创建成功")
return article
def find_article(self, article_id: str) -> Optional[Article]:
"""查找文章"""
for article in self.articles:
if article.id == article_id:
return article
return None
def list_articles(self, published_only: bool = True):
"""列出文章"""
filtered_articles = self.articles
if published_only:
filtered_articles = [a for a in self.articles if a.published]
if not filtered_articles:
print("没有找到文章")
return
print(f"\n{'='*80}")
print("文章列表")
print(f"{'='*80}")
for article in filtered_articles:
status = "已发布" if article.published else "草稿"
print(f"\nID: {article.id}")
print(f"标题: {article.title}")
print(f"作者: {article.author}")
print(f"分类: {article.category}")
print(f"标签: {', '.join(article.tags)}")
print(f"状态: {status}")
print(f"创建时间: {article.created_at}")
print(f"更新时间: {article.updated_at}")
print("-" * 50)
def search_articles(self, keyword: str):
"""搜索文章"""
results = []
keyword_lower = keyword.lower()
for article in self.articles:
if (keyword_lower in article.title.lower() or
keyword_lower in article.content.lower() or
keyword_lower in article.category.lower() or
any(keyword_lower in tag.lower() for tag in article.tags)):
results.append(article)
if not results:
print(f"没有找到包含 '{keyword}' 的文章")
return
print(f"\n找到 {len(results)} 篇相关文章:")
for article in results:
print(f"- {article.title} (作者: {article.author})")
def get_category_stats(self):
"""获取分类统计"""
stats = {}
for article in self.articles:
category = article.category
if category not in stats:
stats[category] = {"total": 0, "published": 0}
stats[category]["total"] += 1
if article.published:
stats[category]["published"] += 1
print("\n分类统计:")
for category, data in stats.items():
print(f"{category}: 总计 {data['total']} 篇, 已发布 {data['published']} 篇")
def main():
"""主程序"""
blog = BlogSystem()
while True:
print("\n" + "="*50)
print("简单博客系统")
print("="*50)
print("1. 发布新文章")
print("2. 编辑文章")
print("3. 查看文章列表")
print("4. 查看文章详情")
print("5. 搜索文章")
print("6. 发布/取消发布")
print("7. 分类统计")
print("0. 退出")
choice = input("\n请选择操作 (0-7): ").strip()
if choice == "1":
title = input("文章标题: ").strip()
content = input("文章内容: ").strip()
author = input("作者: ").strip()
print("可选分类:", ", ".join(blog.categories))
category = input("分类: ").strip()
tags_input = input("标签 (用逗号分隔): ").strip()
tags = [tag.strip() for tag in tags_input.split(",") if tag.strip()]
if title and content and author and category:
blog.create_article(title, content, author, category, tags)
else:
print("请填写完整信息")
elif choice == "2":
blog.list_articles(published_only=False)
article_id = input("请输入文章ID: ").strip()
article = blog.find_article(article_id)
if article:
print(f"\n编辑文章: {article.title}")
new_title = input(f"新标题 (当前: {article.title}): ").strip()
new_content = input(f"新内容 (当前: {article.content[:50]}...): ").strip()
new_category = input(f"新分类 (当前: {article.category}): ").strip()
if new_title or new_content or new_category:
article.update(
title=new_title if new_title else article.title,
content=new_content if new_content else article.content,
category=new_category if new_category else article.category
)
blog.save_data()
print("文章更新成功")
else:
print("文章不存在")
elif choice == "3":
show_all = input("显示所有文章? (y/N): ").strip().lower() == 'y'
blog.list_articles(published_only=not show_all)
elif choice == "4":
article_id = input("请输入文章ID: ").strip()
article = blog.find_article(article_id)
if article:
print(f"\n{'='*60}")
print(f"标题: {article.title}")
print(f"作者: {article.author}")
print(f"分类: {article.category}")
print(f"标签: {', '.join(article.tags)}")
print(f"状态: {'已发布' if article.published else '草稿'}")
print(f"创建时间: {article.created_at}")
print(f"更新时间: {article.updated_at}")
print(f"{'='*60}")
print(f"\n内容:\n{article.content}")
else:
print("文章不存在")
elif choice == "5":
keyword = input("搜索关键词: ").strip()
if keyword:
blog.search_articles(keyword)
elif choice == "6":
blog.list_articles(published_only=False)
article_id = input("请输入文章ID: ").strip()
article = blog.find_article(article_id)
if article:
if article.published:
article.unpublish()
print("文章已取消发布")
else:
article.publish()
print("文章已发布")
blog.save_data()
else:
print("文章不存在")
elif choice == "7":
blog.get_category_stats()
elif choice == "0":
print("感谢使用博客系统!")
break
else:
print("无效选择,请重试")
if __name__ == "__main__":
main()🎯 项目4:数据分析工具
项目描述
创建一个简单的数据分析工具,支持CSV数据的读取、分析和可视化。
功能需求
- 读取CSV数据
- 基本统计分析
- 数据清洗
- 简单图表生成
实现代码
python
import csv
import json
import statistics
from typing import List, Dict, Any, Optional
from dataclasses import dataclass
@dataclass
class DataPoint:
"""数据点"""
values: Dict[str, Any]
def get_value(self, column: str):
"""获取指定列的值"""
return self.values.get(column)
class DataAnalyzer:
"""数据分析器"""
def __init__(self):
self.data: List[DataPoint] = []
self.columns: List[str] = []
def load_csv(self, filename: str):
"""加载CSV文件"""
try:
with open(filename, 'r', encoding='utf-8') as f:
reader = csv.DictReader(f)
self.columns = reader.fieldnames
self.data = []
for row in reader:
# 尝试转换数值
processed_row = {}
for key, value in row.items():
processed_row[key] = self._convert_value(value)
self.data.append(DataPoint(processed_row))
print(f"成功加载 {len(self.data)} 行数据,{len(self.columns)} 列")
return True
except Exception as e:
print(f"加载CSV文件失败: {e}")
return False
def _convert_value(self, value: str):
"""转换数据类型"""
if not value or value.strip() == "":
return None
# 尝试转换为数字
try:
if '.' in value:
return float(value)
else:
return int(value)
except ValueError:
return value.strip()
def get_basic_stats(self, column: str):
"""获取基本统计信息"""
values = [point.get_value(column) for point in self.data if point.get_value(column) is not None]
if not values:
print(f"列 '{column}' 没有有效数据")
return None
# 检查是否为数值类型
numeric_values = [v for v in values if isinstance(v, (int, float))]
if not numeric_values:
print(f"列 '{column}' 不是数值类型")
return None
stats = {
"count": len(numeric_values),
"mean": statistics.mean(numeric_values),
"median": statistics.median(numeric_values),
"mode": statistics.mode(numeric_values) if len(set(numeric_values)) < len(numeric_values) else "无众数",
"min": min(numeric_values),
"max": max(numeric_values),
"std": statistics.stdev(numeric_values) if len(numeric_values) > 1 else 0
}
return stats
def filter_data(self, column: str, condition: str, value: Any):
"""过滤数据"""
filtered_data = []
for point in self.data:
point_value = point.get_value(column)
if condition == "==" and point_value == value:
filtered_data.append(point)
elif condition == "!=" and point_value != value:
filtered_data.append(point)
elif condition == ">" and isinstance(point_value, (int, float)) and point_value > value:
filtered_data.append(point)
elif condition == "<" and isinstance(point_value, (int, float)) and point_value < value:
filtered_data.append(point)
elif condition == ">=" and isinstance(point_value, (int, float)) and point_value >= value:
filtered_data.append(point)
elif condition == "<=" and isinstance(point_value, (int, float)) and point_value <= value:
filtered_data.append(point)
return filtered_data
def group_by(self, group_column: str, agg_column: str, agg_func: str = "count"):
"""分组聚合"""
groups = {}
for point in self.data:
group_value = point.get_value(group_column)
agg_value = point.get_value(agg_column)
if group_value not in groups:
groups[group_value] = []
if agg_value is not None:
groups[group_value].append(agg_value)
result = {}
for group, values in groups.items():
if agg_func == "count":
result[group] = len(values)
elif agg_func == "sum" and values and isinstance(values[0], (int, float)):
result[group] = sum(values)
elif agg_func == "mean" and values and isinstance(values[0], (int, float)):
result[group] = sum(values) / len(values)
elif agg_func == "max" and values and isinstance(values[0], (int, float)):
result[group] = max(values)
elif agg_func == "min" and values and isinstance(values[0], (int, float)):
result[group] = min(values)
return result
def generate_report(self):
"""生成分析报告"""
if not self.data:
print("没有数据可分析")
return
print("\n" + "="*60)
print("数据分析报告")
print("="*60)
print(f"数据概览:")
print(f" 总行数: {len(self.data)}")
print(f" 总列数: {len(self.columns)}")
print(f" 列名: {', '.join(self.columns)}")
print(f"\n各列统计:")
for column in self.columns:
stats = self.get_basic_stats(column)
if stats:
print(f"\n{column}:")
print(f" 有效值数量: {stats['count']}")
print(f" 平均值: {stats['mean']:.2f}")
print(f" 中位数: {stats['median']:.2f}")
print(f" 最小值: {stats['min']}")
print(f" 最大值: {stats['max']}")
print(f" 标准差: {stats['std']:.2f}")
def export_results(self, filename: str, data: List[DataPoint] = None):
"""导出结果"""
if data is None:
data = self.data
try:
with open(filename, 'w', newline='', encoding='utf-8') as f:
writer = csv.writer(f)
writer.writerow(self.columns)
for point in data:
row = [point.get_value(col) for col in self.columns]
writer.writerow(row)
print(f"结果已导出到 {filename}")
return True
except Exception as e:
print(f"导出失败: {e}")
return False
def main():
"""主程序"""
analyzer = DataAnalyzer()
while True:
print("\n" + "="*50)
print("数据分析工具")
print("="*50)
print("1. 加载CSV文件")
print("2. 查看数据概览")
print("3. 基本统计分析")
print("4. 数据过滤")
print("5. 分组聚合")
print("6. 生成报告")
print("7. 导出结果")
print("0. 退出")
choice = input("\n请选择操作 (0-7): ").strip()
if choice == "1":
filename = input("CSV文件名: ").strip()
if filename:
analyzer.load_csv(filename)
elif choice == "2":
if analyzer.data:
print(f"\n数据概览:")
print(f"行数: {len(analyzer.data)}")
print(f"列数: {len(analyzer.columns)}")
print(f"列名: {', '.join(analyzer.columns)}")
# 显示前几行数据
print(f"\n前5行数据:")
for i, point in enumerate(analyzer.data[:5]):
print(f"第{i+1}行: {point.values}")
else:
print("请先加载数据")
elif choice == "3":
if analyzer.data:
column = input("请输入列名: ").strip()
if column in analyzer.columns:
stats = analyzer.get_basic_stats(column)
if stats:
print(f"\n{column} 统计信息:")
for key, value in stats.items():
print(f" {key}: {value}")
else:
print("列名不存在")
else:
print("请先加载数据")
elif choice == "4":
if analyzer.data:
column = input("列名: ").strip()
condition = input("条件 (==, !=, >, <, >=, <=): ").strip()
value_input = input("值: ").strip()
# 尝试转换值
try:
value = analyzer._convert_value(value_input)
except:
value = value_input
filtered = analyzer.filter_data(column, condition, value)
print(f"过滤结果: {len(filtered)} 行")
if filtered:
show_details = input("显示详细信息? (y/N): ").strip().lower() == 'y'
if show_details:
for i, point in enumerate(filtered[:10]): # 显示前10行
print(f"第{i+1}行: {point.values}")
else:
print("请先加载数据")
elif choice == "5":
if analyzer.data:
group_column = input("分组列: ").strip()
agg_column = input("聚合列: ").strip()
agg_func = input("聚合函数 (count, sum, mean, max, min): ").strip()
if group_column in analyzer.columns and agg_column in analyzer.columns:
result = analyzer.group_by(group_column, agg_column, agg_func)
print(f"\n分组聚合结果:")
for group, value in result.items():
print(f" {group}: {value}")
else:
print("列名不存在")
else:
print("请先加载数据")
elif choice == "6":
analyzer.generate_report()
elif choice == "7":
if analyzer.data:
filename = input("导出文件名: ").strip()
if filename:
analyzer.export_results(filename)
else:
print("请先加载数据")
elif choice == "0":
print("感谢使用数据分析工具!")
break
else:
print("无效选择,请重试")
if __name__ == "__main__":
main()🎯 项目5:网络爬虫
项目描述
创建一个简单的网络爬虫,用于抓取网页内容并保存到文件。
功能需求
- 抓取网页内容
- 解析HTML
- 提取特定信息
- 保存到文件
实现代码
python
import requests
import json
import csv
from urllib.parse import urljoin, urlparse
from typing import List, Dict, Set
import time
import re
class WebCrawler:
"""网络爬虫"""
def __init__(self, delay: float = 1.0):
self.delay = delay
self.visited_urls: Set[str] = set()
self.session = requests.Session()
self.session.headers.update({
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'
})
def fetch_page(self, url: str) -> Optional[requests.Response]:
"""获取网页内容"""
try:
print(f"正在抓取: {url}")
response = self.session.get(url, timeout=10)
response.raise_for_status()
return response
except requests.RequestException as e:
print(f"抓取失败 {url}: {e}")
return None
def extract_links(self, html: str, base_url: str) -> List[str]:
"""提取链接"""
links = []
# 简单的链接提取
link_pattern = r'href=["\']([^"\']+)["\']'
matches = re.findall(link_pattern, html, re.IGNORECASE)
for match in matches:
absolute_url = urljoin(base_url, match)
if self.is_valid_url(absolute_url):
links.append(absolute_url)
return list(set(links)) # 去重
def extract_text(self, html: str) -> str:
"""提取文本内容"""
# 简单的HTML标签移除
text = re.sub(r'<script.*?</script>', '', html, flags=re.DOTALL | re.IGNORECASE)
text = re.sub(r'<style.*?</style>', '', text, flags=re.DOTALL | re.IGNORECASE)
text = re.sub(r'<[^>]+>', '', text)
text = re.sub(r'\s+', ' ', text)
return text.strip()
def extract_emails(self, text: str) -> List[str]:
"""提取邮箱地址"""
email_pattern = r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b'
return re.findall(email_pattern, text)
def extract_phones(self, text: str) -> List[str]:
"""提取电话号码"""
phone_pattern = r'\b\d{3}[-.]?\d{3}[-.]?\d{4}\b'
return re.findall(phone_pattern, text)
def is_valid_url(self, url: str) -> bool:
"""验证URL是否有效"""
try:
parsed = urlparse(url)
return bool(parsed.netloc) and parsed.scheme in ['http', 'https']
except:
return False
def crawl_single_page(self, url: str) -> Dict:
"""爬取单个页面"""
if url in self.visited_urls:
return None
response = self.fetch_page(url)
if not response:
return None
self.visited_urls.add(url)
html = response.text
text = self.extract_text(html)
data = {
'url': url,
'title': self.extract_title(html),
'text': text[:1000], # 限制文本长度
'emails': self.extract_emails(text),
'phones': self.extract_phones(text),
'links': self.extract_links(html, url),
'status_code': response.status_code,
'content_length': len(html),
'timestamp': time.strftime('%Y-%m-%d %H:%M:%S')
}
time.sleep(self.delay) # 延迟
return data
def extract_title(self, html: str) -> str:
"""提取页面标题"""
title_match = re.search(r'<title>(.*?)</title>', html, re.IGNORECASE | re.DOTALL)
if title_match:
return title_match.group(1).strip()
return "无标题"
def crawl_website(self, start_url: str, max_pages: int = 10) -> List[Dict]:
"""爬取网站"""
results = []
urls_to_visit = [start_url]
while urls_to_visit and len(results) < max_pages:
current_url = urls_to_visit.pop(0)
if current_url in self.visited_urls:
continue
page_data = self.crawl_single_page(current_url)
if page_data:
results.append(page_data)
# 添加新发现的链接
for link in page_data['links']:
if link not in self.visited_urls and link not in urls_to_visit:
urls_to_visit.append(link)
return results
def save_to_json(self, data: List[Dict], filename: str):
"""保存为JSON格式"""
try:
with open(filename, 'w', encoding='utf-8') as f:
json.dump(data, f, ensure_ascii=False, indent=2)
print(f"数据已保存到 {filename}")
except Exception as e:
print(f"保存JSON文件失败: {e}")
def save_to_csv(self, data: List[Dict], filename: str):
"""保存为CSV格式"""
try:
if not data:
print("没有数据可保存")
return
with open(filename, 'w', newline='', encoding='utf-8') as f:
writer = csv.DictWriter(f, fieldnames=data[0].keys())
writer.writeheader()
writer.writerows(data)
print(f"数据已保存到 {filename}")
except Exception as e:
print(f"保存CSV文件失败: {e}")
def generate_report(self, data: List[Dict]):
"""生成爬取报告"""
if not data:
print("没有爬取到数据")
return
print("\n" + "="*60)
print("爬取报告")
print("="*60)
total_pages = len(data)
total_emails = sum(len(page['emails']) for page in data)
total_phones = sum(len(page['phones']) for page in data)
total_links = sum(len(page['links']) for page in data)
print(f"爬取页面数: {total_pages}")
print(f"发现邮箱数: {total_emails}")
print(f"发现电话数: {total_phones}")
print(f"发现链接数: {total_links}")
print(f"\n页面列表:")
for i, page in enumerate(data, 1):
print(f"{i}. {page['title']} - {page['url']}")
# 统计邮箱
all_emails = set()
for page in data:
all_emails.update(page['emails'])
if all_emails:
print(f"\n发现的邮箱:")
for email in sorted(all_emails):
print(f" {email}")
def main():
"""主程序"""
crawler = WebCrawler(delay=1.0)
while True:
print("\n" + "="*50)
print("网络爬虫工具")
print("="*50)
print("1. 爬取单个页面")
print("2. 爬取网站")
print("3. 查看爬取结果")
print("4. 保存为JSON")
print("5. 保存为CSV")
print("0. 退出")
choice = input("\n请选择操作 (0-5): ").strip()
if choice == "1":
url = input("请输入URL: ").strip()
if url:
data = crawler.crawl_single_page(url)
if data:
print(f"\n页面标题: {data['title']}")
print(f"页面URL: {data['url']}")
print(f"文本长度: {len(data['text'])}")
print(f"发现邮箱: {len(data['emails'])}")
print(f"发现电话: {len(data['phones'])}")
print(f"发现链接: {len(data['links'])}")
elif choice == "2":
url = input("请输入起始URL: ").strip()
max_pages = int(input("最大爬取页面数 (默认10): ").strip() or "10")
if url:
print("开始爬取...")
data = crawler.crawl_website(url, max_pages)
crawler.generate_report(data)
elif choice == "3":
if crawler.visited_urls:
print(f"已访问的URL ({len(crawler.visited_urls)}):")
for url in crawler.visited_urls:
print(f" {url}")
else:
print("还没有爬取任何页面")
elif choice == "4":
filename = input("JSON文件名 (默认: crawl_results.json): ").strip()
if not filename:
filename = "crawl_results.json"
# 重新爬取数据或使用缓存
if not crawler.visited_urls:
print("请先爬取一些页面")
else:
# 这里简化处理,实际应该保存爬取的数据
print("请先爬取页面,然后保存数据")
elif choice == "5":
filename = input("CSV文件名 (默认: crawl_results.csv): ").strip()
if not filename:
filename = "crawl_results.csv"
print("请先爬取页面,然后保存数据")
elif choice == "0":
print("感谢使用爬虫工具!")
break
else:
print("无效选择,请重试")
if __name__ == "__main__":
main()📚 学习资源
推荐书籍
- 《Python编程:从入门到实践》
- 《流畅的Python》
- 《Python Cookbook》
- 《Effective Python》
在线资源
实践建议
- 从简单开始:先完成基础项目,再挑战复杂项目
- 注重代码质量:编写清晰、可读的代码
- 测试驱动:为你的代码编写测试
- 版本控制:使用Git管理代码
- 持续学习:关注Python社区和新技术
🎯 下一步学习方向
进阶主题
- Web开发:Django、Flask、FastAPI
- 数据科学:Pandas、NumPy、Matplotlib
- 机器学习:Scikit-learn、TensorFlow、PyTorch
- 自动化:Selenium、Requests、BeautifulSoup
- GUI开发:Tkinter、PyQt、Kivy
项目扩展
- 为现有项目添加Web界面
- 实现数据库存储
- 添加用户认证系统
- 部署到云平台
- 添加API接口
💡 记住:编程是一门实践性很强的技能,多动手、多思考、多总结
