Skip to content

🚀 Python 实践项目

项目概述

通过实际项目来巩固和运用所学的 Python 知识。这些项目涵盖了从基础到进阶的各种应用场景,帮助你提升编程技能和解决实际问题的能力。

💡 学习建议: - 从简单项目开始,逐步增加复杂度 - 注重代码质量和最佳实践 - 多思考项目的扩展性和优化 - 记录学习过程和遇到的问题

🎯 项目1:个人任务管理器

项目描述

创建一个命令行任务管理器,支持添加、删除、查看和完成任务。

功能需求

  • 添加新任务
  • 标记任务完成
  • 删除任务
  • 查看所有任务
  • 按状态筛选任务
  • 数据持久化

实现代码

python
import json
import os
from datetime import datetime
from typing import List, Dict

class Task:
    """任务类"""
    
    def __init__(self, title: str, description: str = ""):
        self.id = datetime.now().strftime("%Y%m%d%H%M%S")
        self.title = title
        self.description = description
        self.created_at = datetime.now().isoformat()
        self.completed = False
        self.completed_at = None
    
    def complete(self):
        """标记任务完成"""
        self.completed = True
        self.completed_at = datetime.now().isoformat()
    
    def to_dict(self):
        """转换为字典"""
        return {
            "id": self.id,
            "title": self.title,
            "description": self.description,
            "created_at": self.created_at,
            "completed": self.completed,
            "completed_at": self.completed_at
        }
    
    @classmethod
    def from_dict(cls, data: Dict):
        """从字典创建任务"""
        task = cls(data["title"], data["description"])
        task.id = data["id"]
        task.created_at = data["created_at"]
        task.completed = data["completed"]
        task.completed_at = data.get("completed_at")
        return task

class TaskManager:
    """任务管理器"""
    
    def __init__(self, data_file: str = "tasks.json"):
        self.data_file = data_file
        self.tasks: List[Task] = []
        self.load_tasks()
    
    def load_tasks(self):
        """从文件加载任务"""
        try:
            if os.path.exists(self.data_file):
                with open(self.data_file, 'r', encoding='utf-8') as f:
                    data = json.load(f)
                    self.tasks = [Task.from_dict(task_data) for task_data in data]
        except Exception as e:
            print(f"加载任务时发生错误: {e}")
            self.tasks = []
    
    def save_tasks(self):
        """保存任务到文件"""
        try:
            data = [task.to_dict() for task in self.tasks]
            with open(self.data_file, 'w', encoding='utf-8') as f:
                json.dump(data, f, ensure_ascii=False, indent=2)
        except Exception as e:
            print(f"保存任务时发生错误: {e}")
    
    def add_task(self, title: str, description: str = ""):
        """添加新任务"""
        task = Task(title, description)
        self.tasks.append(task)
        self.save_tasks()
        print(f"任务 '{title}' 已添加")
    
    def complete_task(self, task_id: str):
        """完成任务"""
        for task in self.tasks:
            if task.id == task_id:
                task.complete()
                self.save_tasks()
                print(f"任务 '{task.title}' 已完成")
                return True
        print("任务未找到")
        return False
    
    def delete_task(self, task_id: str):
        """删除任务"""
        for i, task in enumerate(self.tasks):
            if task.id == task_id:
                deleted_task = self.tasks.pop(i)
                self.save_tasks()
                print(f"任务 '{deleted_task.title}' 已删除")
                return True
        print("任务未找到")
        return False
    
    def list_tasks(self, show_completed: bool = True):
        """列出任务"""
        if not self.tasks:
            print("没有任务")
            return
        
        print(f"\n{'='*50}")
        print("任务列表")
        print(f"{'='*50}")
        
        for task in self.tasks:
            if not show_completed and task.completed:
                continue
            
            status = "✅ 已完成" if task.completed else "⏳ 进行中"
            print(f"\nID: {task.id}")
            print(f"标题: {task.title}")
            if task.description:
                print(f"描述: {task.description}")
            print(f"状态: {status}")
            print(f"创建时间: {task.created_at}")
            if task.completed_at:
                print(f"完成时间: {task.completed_at}")
            print("-" * 30)
    
    def get_stats(self):
        """获取任务统计"""
        total = len(self.tasks)
        completed = sum(1 for task in self.tasks if task.completed)
        pending = total - completed
        
        print(f"\n任务统计:")
        print(f"总任务数: {total}")
        print(f"已完成: {completed}")
        print(f"待完成: {pending}")
        if total > 0:
            completion_rate = (completed / total) * 100
            print(f"完成率: {completion_rate:.1f}%")

def main():
    """主程序"""
    manager = TaskManager()
    
    while True:
        print("\n" + "="*50)
        print("个人任务管理器")
        print("="*50)
        print("1. 添加任务")
        print("2. 完成任务")
        print("3. 删除任务")
        print("4. 查看所有任务")
        print("5. 查看待完成任务")
        print("6. 查看任务统计")
        print("0. 退出")
        
        choice = input("\n请选择操作 (0-6): ").strip()
        
        if choice == "1":
            title = input("请输入任务标题: ").strip()
            description = input("请输入任务描述 (可选): ").strip()
            if title:
                manager.add_task(title, description)
            else:
                print("任务标题不能为空")
        
        elif choice == "2":
            manager.list_tasks()
            task_id = input("请输入要完成的任务ID: ").strip()
            if task_id:
                manager.complete_task(task_id)
        
        elif choice == "3":
            manager.list_tasks()
            task_id = input("请输入要删除的任务ID: ").strip()
            if task_id:
                confirm = input("确认删除? (y/N): ").strip().lower()
                if confirm == 'y':
                    manager.delete_task(task_id)
        
        elif choice == "4":
            manager.list_tasks()
        
        elif choice == "5":
            manager.list_tasks(show_completed=False)
        
        elif choice == "6":
            manager.get_stats()
        
        elif choice == "0":
            print("感谢使用任务管理器!")
            break
        
        else:
            print("无效选择,请重试")

if __name__ == "__main__":
    main()

🎯 项目2:学生成绩管理系统

项目描述

创建一个学生成绩管理系统,支持学生信息管理和成绩统计。

功能需求

  • 添加学生信息
  • 录入学生成绩
  • 计算平均分和排名
  • 生成成绩报告
  • 数据导入导出

实现代码

python
import csv
import json
from typing import List, Dict, Optional
from dataclasses import dataclass, asdict

@dataclass
class Student:
    """学生类"""
    student_id: str
    name: str
    age: int
    class_name: str
    scores: Dict[str, float] = None
    
    def __post_init__(self):
        if self.scores is None:
            self.scores = {}
    
    def add_score(self, subject: str, score: float):
        """添加成绩"""
        if 0 <= score <= 100:
            self.scores[subject] = score
            return True
        return False
    
    def get_average_score(self) -> float:
        """计算平均分"""
        if not self.scores:
            return 0.0
        return sum(self.scores.values()) / len(self.scores)
    
    def get_total_score(self) -> float:
        """计算总分"""
        return sum(self.scores.values())

class GradeManager:
    """成绩管理器"""
    
    def __init__(self):
        self.students: List[Student] = []
        self.subjects = ["语文", "数学", "英语", "物理", "化学"]
    
    def add_student(self, student_id: str, name: str, age: int, class_name: str):
        """添加学生"""
        # 检查学号是否已存在
        if any(s.student_id == student_id for s in self.students):
            print(f"学号 {student_id} 已存在")
            return False
        
        student = Student(student_id, name, age, class_name)
        self.students.append(student)
        print(f"学生 {name} 添加成功")
        return True
    
    def input_scores(self, student_id: str):
        """录入成绩"""
        student = self.find_student(student_id)
        if not student:
            print("学生不存在")
            return False
        
        print(f"为 {student.name} 录入成绩:")
        for subject in self.subjects:
            while True:
                try:
                    score = float(input(f"{subject} 成绩 (0-100): "))
                    if student.add_score(subject, score):
                        break
                    else:
                        print("成绩必须在 0-100 之间")
                except ValueError:
                    print("请输入有效数字")
        
        print("成绩录入完成")
        return True
    
    def find_student(self, student_id: str) -> Optional[Student]:
        """查找学生"""
        for student in self.students:
            if student.student_id == student_id:
                return student
        return None
    
    def calculate_rankings(self) -> List[Student]:
        """计算排名"""
        return sorted(self.students, key=lambda s: s.get_average_score(), reverse=True)
    
    def generate_report(self):
        """生成成绩报告"""
        if not self.students:
            print("没有学生数据")
            return
        
        print("\n" + "="*80)
        print("学生成绩报告")
        print("="*80)
        
        rankings = self.calculate_rankings()
        
        print(f"{'排名':<4} {'学号':<12} {'姓名':<8} {'班级':<10} {'平均分':<8} {'总分':<8}")
        print("-" * 80)
        
        for i, student in enumerate(rankings, 1):
            avg_score = student.get_average_score()
            total_score = student.get_total_score()
            print(f"{i:<4} {student.student_id:<12} {student.name:<8} "
                  f"{student.class_name:<10} {avg_score:<8.1f} {total_score:<8.1f}")
        
        # 统计信息
        print("\n" + "="*50)
        print("班级统计")
        print("="*50)
        
        class_stats = {}
        for student in self.students:
            class_name = student.class_name
            if class_name not in class_stats:
                class_stats[class_name] = []
            class_stats[class_name].append(student.get_average_score())
        
        for class_name, scores in class_stats.items():
            avg_score = sum(scores) / len(scores)
            print(f"{class_name}: 平均分 {avg_score:.1f}, 学生数 {len(scores)}")
    
    def export_to_csv(self, filename: str):
        """导出到CSV文件"""
        try:
            with open(filename, 'w', newline='', encoding='utf-8') as f:
                writer = csv.writer(f)
                
                # 写入标题行
                headers = ["学号", "姓名", "年龄", "班级"] + self.subjects + ["平均分", "总分"]
                writer.writerow(headers)
                
                # 写入数据
                for student in self.students:
                    row = [
                        student.student_id,
                        student.name,
                        student.age,
                        student.class_name
                    ]
                    
                    # 添加各科成绩
                    for subject in self.subjects:
                        score = student.scores.get(subject, 0)
                        row.append(score)
                    
                    # 添加平均分和总分
                    row.append(student.get_average_score())
                    row.append(student.get_total_score())
                    
                    writer.writerow(row)
            
            print(f"数据已导出到 {filename}")
            return True
        except Exception as e:
            print(f"导出失败: {e}")
            return False
    
    def import_from_csv(self, filename: str):
        """从CSV文件导入"""
        try:
            with open(filename, 'r', encoding='utf-8') as f:
                reader = csv.DictReader(f)
                
                for row in reader:
                    student_id = row["学号"]
                    name = row["姓名"]
                    age = int(row["年龄"])
                    class_name = row["班级"]
                    
                    student = Student(student_id, name, age, class_name)
                    
                    # 导入成绩
                    for subject in self.subjects:
                        if subject in row and row[subject]:
                            score = float(row[subject])
                            student.add_score(subject, score)
                    
                    self.students.append(student)
            
            print(f"从 {filename} 导入 {len(self.students)} 个学生")
            return True
        except Exception as e:
            print(f"导入失败: {e}")
            return False

def main():
    """主程序"""
    manager = GradeManager()
    
    while True:
        print("\n" + "="*50)
        print("学生成绩管理系统")
        print("="*50)
        print("1. 添加学生")
        print("2. 录入成绩")
        print("3. 查看学生信息")
        print("4. 生成成绩报告")
        print("5. 导出数据")
        print("6. 导入数据")
        print("0. 退出")
        
        choice = input("\n请选择操作 (0-6): ").strip()
        
        if choice == "1":
            student_id = input("学号: ").strip()
            name = input("姓名: ").strip()
            age = int(input("年龄: "))
            class_name = input("班级: ").strip()
            manager.add_student(student_id, name, age, class_name)
        
        elif choice == "2":
            student_id = input("请输入学号: ").strip()
            manager.input_scores(student_id)
        
        elif choice == "3":
            student_id = input("请输入学号: ").strip()
            student = manager.find_student(student_id)
            if student:
                print(f"\n学生信息:")
                print(f"学号: {student.student_id}")
                print(f"姓名: {student.name}")
                print(f"年龄: {student.age}")
                print(f"班级: {student.class_name}")
                print(f"成绩: {student.scores}")
                print(f"平均分: {student.get_average_score():.1f}")
            else:
                print("学生不存在")
        
        elif choice == "4":
            manager.generate_report()
        
        elif choice == "5":
            filename = input("导出文件名 (默认: students.csv): ").strip()
            if not filename:
                filename = "students.csv"
            manager.export_to_csv(filename)
        
        elif choice == "6":
            filename = input("导入文件名: ").strip()
            if filename:
                manager.import_from_csv(filename)
        
        elif choice == "0":
            print("感谢使用成绩管理系统!")
            break
        
        else:
            print("无效选择,请重试")

if __name__ == "__main__":
    main()

🎯 项目3:简单博客系统

项目描述

创建一个简单的博客系统,支持文章的发布、编辑和查看。

功能需求

  • 发布新文章
  • 编辑现有文章
  • 查看文章列表
  • 搜索文章
  • 文章分类管理

实现代码

python
import json
import os
from datetime import datetime
from typing import List, Dict, Optional
from dataclasses import dataclass, asdict

@dataclass
class Article:
    """文章类"""
    id: str
    title: str
    content: str
    author: str
    category: str
    tags: List[str]
    created_at: str
    updated_at: str
    published: bool = False
    
    def update(self, title: str = None, content: str = None, 
               category: str = None, tags: List[str] = None):
        """更新文章"""
        if title:
            self.title = title
        if content:
            self.content = content
        if category:
            self.category = category
        if tags:
            self.tags = tags
        self.updated_at = datetime.now().isoformat()
    
    def publish(self):
        """发布文章"""
        self.published = True
        self.updated_at = datetime.now().isoformat()
    
    def unpublish(self):
        """取消发布"""
        self.published = False
        self.updated_at = datetime.now().isoformat()

class BlogSystem:
    """博客系统"""
    
    def __init__(self, data_file: str = "blog_data.json"):
        self.data_file = data_file
        self.articles: List[Article] = []
        self.categories = ["技术", "生活", "学习", "其他"]
        self.load_data()
    
    def load_data(self):
        """加载数据"""
        try:
            if os.path.exists(self.data_file):
                with open(self.data_file, 'r', encoding='utf-8') as f:
                    data = json.load(f)
                    self.articles = [Article(**article_data) for article_data in data.get("articles", [])]
                    self.categories = data.get("categories", self.categories)
        except Exception as e:
            print(f"加载数据时发生错误: {e}")
            self.articles = []
    
    def save_data(self):
        """保存数据"""
        try:
            data = {
                "articles": [asdict(article) for article in self.articles],
                "categories": self.categories
            }
            with open(self.data_file, 'w', encoding='utf-8') as f:
                json.dump(data, f, ensure_ascii=False, indent=2)
        except Exception as e:
            print(f"保存数据时发生错误: {e}")
    
    def create_article(self, title: str, content: str, author: str, 
                      category: str, tags: List[str]):
        """创建新文章"""
        article_id = datetime.now().strftime("%Y%m%d%H%M%S")
        article = Article(
            id=article_id,
            title=title,
            content=content,
            author=author,
            category=category,
            tags=tags,
            created_at=datetime.now().isoformat(),
            updated_at=datetime.now().isoformat()
        )
        self.articles.append(article)
        self.save_data()
        print(f"文章 '{title}' 创建成功")
        return article
    
    def find_article(self, article_id: str) -> Optional[Article]:
        """查找文章"""
        for article in self.articles:
            if article.id == article_id:
                return article
        return None
    
    def list_articles(self, published_only: bool = True):
        """列出文章"""
        filtered_articles = self.articles
        if published_only:
            filtered_articles = [a for a in self.articles if a.published]
        
        if not filtered_articles:
            print("没有找到文章")
            return
        
        print(f"\n{'='*80}")
        print("文章列表")
        print(f"{'='*80}")
        
        for article in filtered_articles:
            status = "已发布" if article.published else "草稿"
            print(f"\nID: {article.id}")
            print(f"标题: {article.title}")
            print(f"作者: {article.author}")
            print(f"分类: {article.category}")
            print(f"标签: {', '.join(article.tags)}")
            print(f"状态: {status}")
            print(f"创建时间: {article.created_at}")
            print(f"更新时间: {article.updated_at}")
            print("-" * 50)
    
    def search_articles(self, keyword: str):
        """搜索文章"""
        results = []
        keyword_lower = keyword.lower()
        
        for article in self.articles:
            if (keyword_lower in article.title.lower() or 
                keyword_lower in article.content.lower() or
                keyword_lower in article.category.lower() or
                any(keyword_lower in tag.lower() for tag in article.tags)):
                results.append(article)
        
        if not results:
            print(f"没有找到包含 '{keyword}' 的文章")
            return
        
        print(f"\n找到 {len(results)} 篇相关文章:")
        for article in results:
            print(f"- {article.title} (作者: {article.author})")
    
    def get_category_stats(self):
        """获取分类统计"""
        stats = {}
        for article in self.articles:
            category = article.category
            if category not in stats:
                stats[category] = {"total": 0, "published": 0}
            stats[category]["total"] += 1
            if article.published:
                stats[category]["published"] += 1
        
        print("\n分类统计:")
        for category, data in stats.items():
            print(f"{category}: 总计 {data['total']} 篇, 已发布 {data['published']} 篇")

def main():
    """主程序"""
    blog = BlogSystem()
    
    while True:
        print("\n" + "="*50)
        print("简单博客系统")
        print("="*50)
        print("1. 发布新文章")
        print("2. 编辑文章")
        print("3. 查看文章列表")
        print("4. 查看文章详情")
        print("5. 搜索文章")
        print("6. 发布/取消发布")
        print("7. 分类统计")
        print("0. 退出")
        
        choice = input("\n请选择操作 (0-7): ").strip()
        
        if choice == "1":
            title = input("文章标题: ").strip()
            content = input("文章内容: ").strip()
            author = input("作者: ").strip()
            
            print("可选分类:", ", ".join(blog.categories))
            category = input("分类: ").strip()
            
            tags_input = input("标签 (用逗号分隔): ").strip()
            tags = [tag.strip() for tag in tags_input.split(",") if tag.strip()]
            
            if title and content and author and category:
                blog.create_article(title, content, author, category, tags)
            else:
                print("请填写完整信息")
        
        elif choice == "2":
            blog.list_articles(published_only=False)
            article_id = input("请输入文章ID: ").strip()
            article = blog.find_article(article_id)
            
            if article:
                print(f"\n编辑文章: {article.title}")
                new_title = input(f"新标题 (当前: {article.title}): ").strip()
                new_content = input(f"新内容 (当前: {article.content[:50]}...): ").strip()
                new_category = input(f"新分类 (当前: {article.category}): ").strip()
                
                if new_title or new_content or new_category:
                    article.update(
                        title=new_title if new_title else article.title,
                        content=new_content if new_content else article.content,
                        category=new_category if new_category else article.category
                    )
                    blog.save_data()
                    print("文章更新成功")
            else:
                print("文章不存在")
        
        elif choice == "3":
            show_all = input("显示所有文章? (y/N): ").strip().lower() == 'y'
            blog.list_articles(published_only=not show_all)
        
        elif choice == "4":
            article_id = input("请输入文章ID: ").strip()
            article = blog.find_article(article_id)
            
            if article:
                print(f"\n{'='*60}")
                print(f"标题: {article.title}")
                print(f"作者: {article.author}")
                print(f"分类: {article.category}")
                print(f"标签: {', '.join(article.tags)}")
                print(f"状态: {'已发布' if article.published else '草稿'}")
                print(f"创建时间: {article.created_at}")
                print(f"更新时间: {article.updated_at}")
                print(f"{'='*60}")
                print(f"\n内容:\n{article.content}")
            else:
                print("文章不存在")
        
        elif choice == "5":
            keyword = input("搜索关键词: ").strip()
            if keyword:
                blog.search_articles(keyword)
        
        elif choice == "6":
            blog.list_articles(published_only=False)
            article_id = input("请输入文章ID: ").strip()
            article = blog.find_article(article_id)
            
            if article:
                if article.published:
                    article.unpublish()
                    print("文章已取消发布")
                else:
                    article.publish()
                    print("文章已发布")
                blog.save_data()
            else:
                print("文章不存在")
        
        elif choice == "7":
            blog.get_category_stats()
        
        elif choice == "0":
            print("感谢使用博客系统!")
            break
        
        else:
            print("无效选择,请重试")

if __name__ == "__main__":
    main()

🎯 项目4:数据分析工具

项目描述

创建一个简单的数据分析工具,支持CSV数据的读取、分析和可视化。

功能需求

  • 读取CSV数据
  • 基本统计分析
  • 数据清洗
  • 简单图表生成

实现代码

python
import csv
import json
import statistics
from typing import List, Dict, Any, Optional
from dataclasses import dataclass

@dataclass
class DataPoint:
    """数据点"""
    values: Dict[str, Any]
    
    def get_value(self, column: str):
        """获取指定列的值"""
        return self.values.get(column)

class DataAnalyzer:
    """数据分析器"""
    
    def __init__(self):
        self.data: List[DataPoint] = []
        self.columns: List[str] = []
    
    def load_csv(self, filename: str):
        """加载CSV文件"""
        try:
            with open(filename, 'r', encoding='utf-8') as f:
                reader = csv.DictReader(f)
                self.columns = reader.fieldnames
                self.data = []
                
                for row in reader:
                    # 尝试转换数值
                    processed_row = {}
                    for key, value in row.items():
                        processed_row[key] = self._convert_value(value)
                    
                    self.data.append(DataPoint(processed_row))
            
            print(f"成功加载 {len(self.data)} 行数据,{len(self.columns)} 列")
            return True
        except Exception as e:
            print(f"加载CSV文件失败: {e}")
            return False
    
    def _convert_value(self, value: str):
        """转换数据类型"""
        if not value or value.strip() == "":
            return None
        
        # 尝试转换为数字
        try:
            if '.' in value:
                return float(value)
            else:
                return int(value)
        except ValueError:
            return value.strip()
    
    def get_basic_stats(self, column: str):
        """获取基本统计信息"""
        values = [point.get_value(column) for point in self.data if point.get_value(column) is not None]
        
        if not values:
            print(f"列 '{column}' 没有有效数据")
            return None
        
        # 检查是否为数值类型
        numeric_values = [v for v in values if isinstance(v, (int, float))]
        
        if not numeric_values:
            print(f"列 '{column}' 不是数值类型")
            return None
        
        stats = {
            "count": len(numeric_values),
            "mean": statistics.mean(numeric_values),
            "median": statistics.median(numeric_values),
            "mode": statistics.mode(numeric_values) if len(set(numeric_values)) < len(numeric_values) else "无众数",
            "min": min(numeric_values),
            "max": max(numeric_values),
            "std": statistics.stdev(numeric_values) if len(numeric_values) > 1 else 0
        }
        
        return stats
    
    def filter_data(self, column: str, condition: str, value: Any):
        """过滤数据"""
        filtered_data = []
        
        for point in self.data:
            point_value = point.get_value(column)
            
            if condition == "==" and point_value == value:
                filtered_data.append(point)
            elif condition == "!=" and point_value != value:
                filtered_data.append(point)
            elif condition == ">" and isinstance(point_value, (int, float)) and point_value > value:
                filtered_data.append(point)
            elif condition == "<" and isinstance(point_value, (int, float)) and point_value < value:
                filtered_data.append(point)
            elif condition == ">=" and isinstance(point_value, (int, float)) and point_value >= value:
                filtered_data.append(point)
            elif condition == "<=" and isinstance(point_value, (int, float)) and point_value <= value:
                filtered_data.append(point)
        
        return filtered_data
    
    def group_by(self, group_column: str, agg_column: str, agg_func: str = "count"):
        """分组聚合"""
        groups = {}
        
        for point in self.data:
            group_value = point.get_value(group_column)
            agg_value = point.get_value(agg_column)
            
            if group_value not in groups:
                groups[group_value] = []
            
            if agg_value is not None:
                groups[group_value].append(agg_value)
        
        result = {}
        for group, values in groups.items():
            if agg_func == "count":
                result[group] = len(values)
            elif agg_func == "sum" and values and isinstance(values[0], (int, float)):
                result[group] = sum(values)
            elif agg_func == "mean" and values and isinstance(values[0], (int, float)):
                result[group] = sum(values) / len(values)
            elif agg_func == "max" and values and isinstance(values[0], (int, float)):
                result[group] = max(values)
            elif agg_func == "min" and values and isinstance(values[0], (int, float)):
                result[group] = min(values)
        
        return result
    
    def generate_report(self):
        """生成分析报告"""
        if not self.data:
            print("没有数据可分析")
            return
        
        print("\n" + "="*60)
        print("数据分析报告")
        print("="*60)
        
        print(f"数据概览:")
        print(f"  总行数: {len(self.data)}")
        print(f"  总列数: {len(self.columns)}")
        print(f"  列名: {', '.join(self.columns)}")
        
        print(f"\n各列统计:")
        for column in self.columns:
            stats = self.get_basic_stats(column)
            if stats:
                print(f"\n{column}:")
                print(f"  有效值数量: {stats['count']}")
                print(f"  平均值: {stats['mean']:.2f}")
                print(f"  中位数: {stats['median']:.2f}")
                print(f"  最小值: {stats['min']}")
                print(f"  最大值: {stats['max']}")
                print(f"  标准差: {stats['std']:.2f}")
    
    def export_results(self, filename: str, data: List[DataPoint] = None):
        """导出结果"""
        if data is None:
            data = self.data
        
        try:
            with open(filename, 'w', newline='', encoding='utf-8') as f:
                writer = csv.writer(f)
                writer.writerow(self.columns)
                
                for point in data:
                    row = [point.get_value(col) for col in self.columns]
                    writer.writerow(row)
            
            print(f"结果已导出到 {filename}")
            return True
        except Exception as e:
            print(f"导出失败: {e}")
            return False

def main():
    """主程序"""
    analyzer = DataAnalyzer()
    
    while True:
        print("\n" + "="*50)
        print("数据分析工具")
        print("="*50)
        print("1. 加载CSV文件")
        print("2. 查看数据概览")
        print("3. 基本统计分析")
        print("4. 数据过滤")
        print("5. 分组聚合")
        print("6. 生成报告")
        print("7. 导出结果")
        print("0. 退出")
        
        choice = input("\n请选择操作 (0-7): ").strip()
        
        if choice == "1":
            filename = input("CSV文件名: ").strip()
            if filename:
                analyzer.load_csv(filename)
        
        elif choice == "2":
            if analyzer.data:
                print(f"\n数据概览:")
                print(f"行数: {len(analyzer.data)}")
                print(f"列数: {len(analyzer.columns)}")
                print(f"列名: {', '.join(analyzer.columns)}")
                
                # 显示前几行数据
                print(f"\n前5行数据:")
                for i, point in enumerate(analyzer.data[:5]):
                    print(f"第{i+1}行: {point.values}")
            else:
                print("请先加载数据")
        
        elif choice == "3":
            if analyzer.data:
                column = input("请输入列名: ").strip()
                if column in analyzer.columns:
                    stats = analyzer.get_basic_stats(column)
                    if stats:
                        print(f"\n{column} 统计信息:")
                        for key, value in stats.items():
                            print(f"  {key}: {value}")
                else:
                    print("列名不存在")
            else:
                print("请先加载数据")
        
        elif choice == "4":
            if analyzer.data:
                column = input("列名: ").strip()
                condition = input("条件 (==, !=, >, <, >=, <=): ").strip()
                value_input = input("值: ").strip()
                
                # 尝试转换值
                try:
                    value = analyzer._convert_value(value_input)
                except:
                    value = value_input
                
                filtered = analyzer.filter_data(column, condition, value)
                print(f"过滤结果: {len(filtered)} 行")
                
                if filtered:
                    show_details = input("显示详细信息? (y/N): ").strip().lower() == 'y'
                    if show_details:
                        for i, point in enumerate(filtered[:10]):  # 显示前10行
                            print(f"第{i+1}行: {point.values}")
            else:
                print("请先加载数据")
        
        elif choice == "5":
            if analyzer.data:
                group_column = input("分组列: ").strip()
                agg_column = input("聚合列: ").strip()
                agg_func = input("聚合函数 (count, sum, mean, max, min): ").strip()
                
                if group_column in analyzer.columns and agg_column in analyzer.columns:
                    result = analyzer.group_by(group_column, agg_column, agg_func)
                    print(f"\n分组聚合结果:")
                    for group, value in result.items():
                        print(f"  {group}: {value}")
                else:
                    print("列名不存在")
            else:
                print("请先加载数据")
        
        elif choice == "6":
            analyzer.generate_report()
        
        elif choice == "7":
            if analyzer.data:
                filename = input("导出文件名: ").strip()
                if filename:
                    analyzer.export_results(filename)
            else:
                print("请先加载数据")
        
        elif choice == "0":
            print("感谢使用数据分析工具!")
            break
        
        else:
            print("无效选择,请重试")

if __name__ == "__main__":
    main()

🎯 项目5:网络爬虫

项目描述

创建一个简单的网络爬虫,用于抓取网页内容并保存到文件。

功能需求

  • 抓取网页内容
  • 解析HTML
  • 提取特定信息
  • 保存到文件

实现代码

python
import requests
import json
import csv
from urllib.parse import urljoin, urlparse
from typing import List, Dict, Set
import time
import re

class WebCrawler:
    """网络爬虫"""
    
    def __init__(self, delay: float = 1.0):
        self.delay = delay
        self.visited_urls: Set[str] = set()
        self.session = requests.Session()
        self.session.headers.update({
            'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'
        })
    
    def fetch_page(self, url: str) -> Optional[requests.Response]:
        """获取网页内容"""
        try:
            print(f"正在抓取: {url}")
            response = self.session.get(url, timeout=10)
            response.raise_for_status()
            return response
        except requests.RequestException as e:
            print(f"抓取失败 {url}: {e}")
            return None
    
    def extract_links(self, html: str, base_url: str) -> List[str]:
        """提取链接"""
        links = []
        # 简单的链接提取
        link_pattern = r'href=["\']([^"\']+)["\']'
        matches = re.findall(link_pattern, html, re.IGNORECASE)
        
        for match in matches:
            absolute_url = urljoin(base_url, match)
            if self.is_valid_url(absolute_url):
                links.append(absolute_url)
        
        return list(set(links))  # 去重
    
    def extract_text(self, html: str) -> str:
        """提取文本内容"""
        # 简单的HTML标签移除
        text = re.sub(r'<script.*?</script>', '', html, flags=re.DOTALL | re.IGNORECASE)
        text = re.sub(r'<style.*?</style>', '', text, flags=re.DOTALL | re.IGNORECASE)
        text = re.sub(r'<[^>]+>', '', text)
        text = re.sub(r'\s+', ' ', text)
        return text.strip()
    
    def extract_emails(self, text: str) -> List[str]:
        """提取邮箱地址"""
        email_pattern = r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b'
        return re.findall(email_pattern, text)
    
    def extract_phones(self, text: str) -> List[str]:
        """提取电话号码"""
        phone_pattern = r'\b\d{3}[-.]?\d{3}[-.]?\d{4}\b'
        return re.findall(phone_pattern, text)
    
    def is_valid_url(self, url: str) -> bool:
        """验证URL是否有效"""
        try:
            parsed = urlparse(url)
            return bool(parsed.netloc) and parsed.scheme in ['http', 'https']
        except:
            return False
    
    def crawl_single_page(self, url: str) -> Dict:
        """爬取单个页面"""
        if url in self.visited_urls:
            return None
        
        response = self.fetch_page(url)
        if not response:
            return None
        
        self.visited_urls.add(url)
        
        html = response.text
        text = self.extract_text(html)
        
        data = {
            'url': url,
            'title': self.extract_title(html),
            'text': text[:1000],  # 限制文本长度
            'emails': self.extract_emails(text),
            'phones': self.extract_phones(text),
            'links': self.extract_links(html, url),
            'status_code': response.status_code,
            'content_length': len(html),
            'timestamp': time.strftime('%Y-%m-%d %H:%M:%S')
        }
        
        time.sleep(self.delay)  # 延迟
        return data
    
    def extract_title(self, html: str) -> str:
        """提取页面标题"""
        title_match = re.search(r'<title>(.*?)</title>', html, re.IGNORECASE | re.DOTALL)
        if title_match:
            return title_match.group(1).strip()
        return "无标题"
    
    def crawl_website(self, start_url: str, max_pages: int = 10) -> List[Dict]:
        """爬取网站"""
        results = []
        urls_to_visit = [start_url]
        
        while urls_to_visit and len(results) < max_pages:
            current_url = urls_to_visit.pop(0)
            
            if current_url in self.visited_urls:
                continue
            
            page_data = self.crawl_single_page(current_url)
            if page_data:
                results.append(page_data)
                
                # 添加新发现的链接
                for link in page_data['links']:
                    if link not in self.visited_urls and link not in urls_to_visit:
                        urls_to_visit.append(link)
        
        return results
    
    def save_to_json(self, data: List[Dict], filename: str):
        """保存为JSON格式"""
        try:
            with open(filename, 'w', encoding='utf-8') as f:
                json.dump(data, f, ensure_ascii=False, indent=2)
            print(f"数据已保存到 {filename}")
        except Exception as e:
            print(f"保存JSON文件失败: {e}")
    
    def save_to_csv(self, data: List[Dict], filename: str):
        """保存为CSV格式"""
        try:
            if not data:
                print("没有数据可保存")
                return
            
            with open(filename, 'w', newline='', encoding='utf-8') as f:
                writer = csv.DictWriter(f, fieldnames=data[0].keys())
                writer.writeheader()
                writer.writerows(data)
            print(f"数据已保存到 {filename}")
        except Exception as e:
            print(f"保存CSV文件失败: {e}")
    
    def generate_report(self, data: List[Dict]):
        """生成爬取报告"""
        if not data:
            print("没有爬取到数据")
            return
        
        print("\n" + "="*60)
        print("爬取报告")
        print("="*60)
        
        total_pages = len(data)
        total_emails = sum(len(page['emails']) for page in data)
        total_phones = sum(len(page['phones']) for page in data)
        total_links = sum(len(page['links']) for page in data)
        
        print(f"爬取页面数: {total_pages}")
        print(f"发现邮箱数: {total_emails}")
        print(f"发现电话数: {total_phones}")
        print(f"发现链接数: {total_links}")
        
        print(f"\n页面列表:")
        for i, page in enumerate(data, 1):
            print(f"{i}. {page['title']} - {page['url']}")
        
        # 统计邮箱
        all_emails = set()
        for page in data:
            all_emails.update(page['emails'])
        
        if all_emails:
            print(f"\n发现的邮箱:")
            for email in sorted(all_emails):
                print(f"  {email}")

def main():
    """主程序"""
    crawler = WebCrawler(delay=1.0)
    
    while True:
        print("\n" + "="*50)
        print("网络爬虫工具")
        print("="*50)
        print("1. 爬取单个页面")
        print("2. 爬取网站")
        print("3. 查看爬取结果")
        print("4. 保存为JSON")
        print("5. 保存为CSV")
        print("0. 退出")
        
        choice = input("\n请选择操作 (0-5): ").strip()
        
        if choice == "1":
            url = input("请输入URL: ").strip()
            if url:
                data = crawler.crawl_single_page(url)
                if data:
                    print(f"\n页面标题: {data['title']}")
                    print(f"页面URL: {data['url']}")
                    print(f"文本长度: {len(data['text'])}")
                    print(f"发现邮箱: {len(data['emails'])}")
                    print(f"发现电话: {len(data['phones'])}")
                    print(f"发现链接: {len(data['links'])}")
        
        elif choice == "2":
            url = input("请输入起始URL: ").strip()
            max_pages = int(input("最大爬取页面数 (默认10): ").strip() or "10")
            
            if url:
                print("开始爬取...")
                data = crawler.crawl_website(url, max_pages)
                crawler.generate_report(data)
        
        elif choice == "3":
            if crawler.visited_urls:
                print(f"已访问的URL ({len(crawler.visited_urls)}):")
                for url in crawler.visited_urls:
                    print(f"  {url}")
            else:
                print("还没有爬取任何页面")
        
        elif choice == "4":
            filename = input("JSON文件名 (默认: crawl_results.json): ").strip()
            if not filename:
                filename = "crawl_results.json"
            
            # 重新爬取数据或使用缓存
            if not crawler.visited_urls:
                print("请先爬取一些页面")
            else:
                # 这里简化处理,实际应该保存爬取的数据
                print("请先爬取页面,然后保存数据")
        
        elif choice == "5":
            filename = input("CSV文件名 (默认: crawl_results.csv): ").strip()
            if not filename:
                filename = "crawl_results.csv"
            
            print("请先爬取页面,然后保存数据")
        
        elif choice == "0":
            print("感谢使用爬虫工具!")
            break
        
        else:
            print("无效选择,请重试")

if __name__ == "__main__":
    main()

📚 学习资源

推荐书籍

  • 《Python编程:从入门到实践》
  • 《流畅的Python》
  • 《Python Cookbook》
  • 《Effective Python》

在线资源

实践建议

  1. 从简单开始:先完成基础项目,再挑战复杂项目
  2. 注重代码质量:编写清晰、可读的代码
  3. 测试驱动:为你的代码编写测试
  4. 版本控制:使用Git管理代码
  5. 持续学习:关注Python社区和新技术

🎯 下一步学习方向

进阶主题

  • Web开发:Django、Flask、FastAPI
  • 数据科学:Pandas、NumPy、Matplotlib
  • 机器学习:Scikit-learn、TensorFlow、PyTorch
  • 自动化:Selenium、Requests、BeautifulSoup
  • GUI开发:Tkinter、PyQt、Kivy

项目扩展

  • 为现有项目添加Web界面
  • 实现数据库存储
  • 添加用户认证系统
  • 部署到云平台
  • 添加API接口

💡 记住:编程是一门实践性很强的技能,多动手、多思考、多总结