0%

Python 自动化脚本集锦

一、前言

Python 被称为”胶水语言”,凭借其简洁的语法和丰富的库,成为自动化任务的首选工具。无论是文件批处理、数据分析、网络请求,还是系统运维,Python 都能提供高效的解决方案。

本文精选 20 个实用的 Python 自动化脚本,涵盖日常工作场景,所有代码都经过测试,可直接运行。


二、文件处理自动化

1. 批量重命名文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
#!/usr/bin/env python3
"""
批量重命名文件
用途:将文件按规则重命名,如添加前缀、替换字符、序号等
"""

import os
import re
from pathlib import Path
from datetime import datetime

def batch_rename(directory, pattern, replacement, prefix='', suffix='', start_num=1):
"""
批量重命名文件

Args:
directory: 目标目录
pattern: 匹配模式(正则或字符串)
replacement: 替换内容
prefix: 文件名前缀
suffix: 文件名后缀
start_num: 起始序号
"""
dir_path = Path(directory)

for i, file in enumerate(dir_path.iterdir(), start=start_num):
if file.is_file():
# 替换模式
new_name = re.sub(pattern, replacement, file.name)

# 添加前缀后缀
name_parts = file.stem.rsplit('.', 1)
if len(name_parts) == 2:
new_name = f"{prefix}{name_parts[0]}{suffix}.{name_parts[1]}"
else:
new_name = f"{prefix}{file.stem}{suffix}"

# 添加序号
new_name = f"{i:03d}_{new_name}"

# 重命名
new_path = dir_path / new_name
if not new_path.exists():
file.rename(new_path)
print(f"Renamed: {file.name} -> {new_name}")
else:
print(f"Skipped (exists): {new_name}")

# 使用示例
if __name__ == "__main__":
# 将所有 .txt 文件添加日期前缀
batch_rename(
directory="./documents",
pattern=r"\.txt$",
replacement="",
prefix=f"{datetime.now().strftime('%Y%m%d')}_"
)

2. 批量转换图片格式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
#!/usr/bin/env python3
"""
批量转换图片格式
用途:将图片批量转换为指定格式,可调整大小、质量
"""

from pathlib import Path
from PIL import Image
import argparse

def convert_images(input_dir, output_dir, format='JPEG', quality=90, resize=None):
"""
批量转换图片格式

Args:
input_dir: 输入目录
output_dir: 输出目录
format: 目标格式(JPEG, PNG, WEBP 等)
quality: 质量(1-100)
resize: 调整尺寸(宽,高)
"""
input_path = Path(input_dir)
output_path = Path(output_dir)
output_path.mkdir(parents=True, exist_ok=True)

# 支持的图片格式
image_extensions = {'.jpg', '.jpeg', '.png', '.gif', '.bmp', '.webp', '.tiff'}

converted = 0
for file in input_path.iterdir():
if file.suffix.lower() in image_extensions:
try:
img = Image.open(file)

# 调整尺寸
if resize:
img = img.resize(resize, Image.Resampling.LANCZOS)

# 转换格式
output_file = output_path / f"{file.stem}.{format.lower()}"

if format.upper() == 'JPEG':
img = img.convert('RGB')
img.save(output_file, format=format, quality=quality)
else:
img.save(output_file, format=format, quality=quality)

print(f"Converted: {file.name} -> {output_file.name}")
converted += 1
except Exception as e:
print(f"Error converting {file.name}: {e}")

print(f"\nTotal converted: {converted} images")

if __name__ == "__main__":
# 使用示例
convert_images(
input_dir="./photos",
output_dir="./converted",
format='WEBP',
quality=85,
resize=(1920, 1080)
)

3. PDF 合并与分割

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
#!/usr/bin/env python3
"""
PDF 合并与分割
用途:合并多个 PDF 或分割 PDF 文件
"""

from PyPDF2 import PdfMerger, PdfReader, PdfWriter
from pathlib import Path

def merge_pdfs(input_dir, output_file):
"""合并目录下的所有 PDF"""
merger = PdfMerger()

pdf_files = sorted(Path(input_dir).glob("*.pdf"))

for pdf in pdf_files:
print(f"Adding: {pdf.name}")
merger.append(str(pdf))

merger.write(output_file)
merger.close()
print(f"Merged {len(pdf_files)} PDFs into {output_file}")

def split_pdf(input_file, output_dir, pages_per_file=1):
"""分割 PDF 文件"""
reader = PdfReader(input_file)
Path(output_dir).mkdir(parents=True, exist_ok=True)

total_pages = len(reader.pages)

for i in range(0, total_pages, pages_per_file):
writer = PdfWriter()

for j in range(i, min(i + pages_per_file, total_pages)):
writer.add_page(reader.pages[j])

output_file = Path(output_dir) / f"page_{i+1}-{i+pages_per_file}.pdf"
with open(output_file, 'wb') as f:
writer.write(f)

print(f"Created: {output_file.name}")

if __name__ == "__main__":
# 合并 PDF
merge_pdfs("./pdfs", "merged.pdf")

# 分割 PDF
# split_pdf("document.pdf", "./split", pages_per_file=5)

4. Excel 数据处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
#!/usr/bin/env python3
"""
Excel 数据处理
用途:合并多个 Excel 文件、数据清洗、生成报表
"""

import pandas as pd
from pathlib import Path
from openpyxl import load_workbook

def merge_excel_files(input_dir, output_file, sheet_name='Sheet1'):
"""合并多个 Excel 文件"""
excel_files = list(Path(input_dir).glob("*.xlsx"))

df_list = []
for file in excel_files:
print(f"Reading: {file.name}")
df = pd.read_excel(file, sheet_name=sheet_name)
df_list.append(df)

merged_df = pd.concat(df_list, ignore_index=True)
merged_df.to_excel(output_file, index=False)
print(f"Merged {len(excel_files)} files into {output_file}")

def clean_excel_data(input_file, output_file):
"""清洗 Excel 数据"""
df = pd.read_excel(input_file)

# 删除空行
df = df.dropna(how='all')

# 删除重复行
df = df.drop_duplicates()

# 填充空值
df = df.fillna('')

# 删除空白列
df = df.loc[:, (df != '').any(axis=0)]

df.to_excel(output_file, index=False)
print(f"Cleaned data saved to {output_file}")

def generate_summary(input_file, output_file):
"""生成数据摘要"""
df = pd.read_excel(input_file)

summary = {
'总行数': [len(df)],
'总列数': [len(df.columns)],
'列名': [', '.join(df.columns)],
}

# 数值列统计
for col in df.select_dtypes(include=['number']).columns:
summary[f'{col}_均值'] = [df[col].mean()]
summary[f'{col}_中位数'] = [df[col].median()]
summary[f'{col}_标准差'] = [df[col].std()]

summary_df = pd.DataFrame(summary)
summary_df.to_excel(output_file, index=False)
print(f"Summary saved to {output_file}")

if __name__ == "__main__":
# 合并 Excel
# merge_excel_files("./excel_files", "merged.xlsx")

# 清洗数据
# clean_excel_data("raw_data.xlsx", "clean_data.xlsx")

# 生成摘要
generate_summary("data.xlsx", "summary.xlsx")

三、网络请求自动化

5. 批量检查网站可用性

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
#!/usr/bin/env python3
"""
批量检查网站可用性
用途:监控多个网站的响应状态和加载时间
"""

import requests
import concurrent.futures
from datetime import datetime
import csv

def check_website(url, timeout=10):
"""检查单个网站"""
try:
start_time = datetime.now()
response = requests.get(url, timeout=timeout)
end_time = datetime.now()

return {
'url': url,
'status': response.status_code,
'response_time': (end_time - start_time).total_seconds(),
'success': True,
'error': None
}
except Exception as e:
return {
'url': url,
'status': None,
'response_time': None,
'success': False,
'error': str(e)
}

def batch_check_websites(urls, max_workers=10):
"""批量检查网站"""
results = []

with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
future_to_url = {executor.submit(check_website, url): url for url in urls}

for future in concurrent.futures.as_completed(future_to_url):
result = future.result()
results.append(result)

status = "✅" if result['success'] else "❌"
time_str = f"{result['response_time']:.2f}s" if result['response_time'] else "N/A"
print(f"{status} {result['url']} - {result['status']} - {time_str}")

return results

def save_results(results, output_file):
"""保存结果到 CSV"""
with open(output_file, 'w', newline='', encoding='utf-8') as f:
writer = csv.DictWriter(f, fieldnames=['url', 'status', 'response_time', 'success', 'error'])
writer.writeheader()
writer.writerows(results)

if __name__ == "__main__":
# 网站列表
websites = [
'https://www.google.com',
'https://www.github.com',
'https://www.stackoverflow.com',
'https://www.python.org',
'https://blog.sharezone.cn'
]

results = batch_check_websites(websites)
save_results(results, 'website_status.csv')

# 统计
success_count = sum(1 for r in results if r['success'])
print(f"\nTotal: {len(results)}, Success: {success_count}, Failed: {len(results) - success_count}")

6. API 批量调用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
#!/usr/bin/env python3
"""
API 批量调用
用途:批量调用 REST API,支持重试、限流、错误处理
"""

import requests
import time
from datetime import datetime
import json

class APIClient:
def __init__(self, base_url, api_key=None, rate_limit=10):
self.base_url = base_url
self.api_key = api_key
self.rate_limit = rate_limit # 每秒请求数
self.last_request = 0

def _rate_limit_wait(self):
"""限流等待"""
elapsed = time.time() - self.last_request
if elapsed < 1 / self.rate_limit:
time.sleep(1 / self.rate_limit - elapsed)
self.last_request = time.time()

def request(self, method, endpoint, **kwargs):
"""发送请求"""
self._rate_limit_wait()

url = f"{self.base_url}/{endpoint}"
headers = kwargs.pop('headers', {})

if self.api_key:
headers['Authorization'] = f'Bearer {self.api_key}'

try:
response = requests.request(method, url, headers=headers, **kwargs)
response.raise_for_status()
return response.json()
except requests.exceptions.RequestException as e:
print(f"Error: {e}")
return None

def batch_get(self, endpoints, max_retries=3):
"""批量 GET 请求"""
results = []

for i, endpoint in enumerate(endpoints, 1):
print(f"[{i}/{len(endpoints)}] Requesting: {endpoint}")

for attempt in range(max_retries):
result = self.request('GET', endpoint)
if result:
results.append({'endpoint': endpoint, 'success': True, 'data': result})
break
else:
if attempt < max_retries - 1:
print(f" Retry {attempt + 1}/{max_retries}")
time.sleep(2 ** attempt) # 指数退避
else:
results.append({'endpoint': endpoint, 'success': False, 'data': None})

return results

if __name__ == "__main__":
# 使用示例
client = APIClient(
base_url='https://api.github.com',
rate_limit=5 # GitHub API 限流
)

repos = ['johnzok/repo1', 'johnzok/repo2', 'johnzok/repo3']
endpoints = [f'repos/{repo}' for repo in repos]

results = client.batch_get(endpoints)

# 保存结果
with open('api_results.json', 'w') as f:
json.dump(results, f, indent=2)

7. 网页数据抓取

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
#!/usr/bin/env python3
"""
网页数据抓取
用途:从网页提取结构化数据
"""

import requests
from bs4 import BeautifulSoup
import pandas as pd
from datetime import datetime

def scrape_articles(url):
"""抓取文章列表"""
response = requests.get(url)
soup = BeautifulSoup(response.text, 'html.parser')

articles = []

# 根据实际网页结构调整选择器
for item in soup.select('.article-item'):
title_elem = item.select_one('.title')
date_elem = item.select_one('.date')
link_elem = item.select_one('a')

if title_elem and link_elem:
articles.append({
'title': title_elem.get_text(strip=True),
'url': link_elem.get('href'),
'date': date_elem.get_text(strip=True) if date_elem else None,
'scraped_at': datetime.now().isoformat()
})

return articles

def scrape_product_details(url):
"""抓取商品详情"""
response = requests.get(url)
soup = BeautifulSoup(response.text, 'html.parser')

product = {
'name': soup.select_one('.product-name').get_text(strip=True) if soup.select_one('.product-name') else None,
'price': soup.select_one('.price').get_text(strip=True) if soup.select_one('.price') else None,
'description': soup.select_one('.description').get_text(strip=True) if soup.select_one('.description') else None,
'rating': soup.select_one('.rating').get_text(strip=True) if soup.select_one('.rating') else None,
}

return product

def save_to_csv(data, filename):
"""保存到 CSV"""
if data:
df = pd.DataFrame(data)
df.to_csv(filename, index=False, encoding='utf-8-sig')
print(f"Saved {len(data)} records to {filename}")

if __name__ == "__main__":
# 抓取文章
articles = scrape_articles('https://blog.example.com/articles')
save_to_csv(articles, 'articles.csv')

# 抓取商品
# product = scrape_product_details('https://shop.example.com/product/123')
# print(product)

四、系统运维自动化

8. 服务器监控脚本

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
#!/usr/bin/env python3
"""
服务器监控脚本
用途:监控 CPU、内存、磁盘、网络等系统资源
"""

import psutil
import json
from datetime import datetime
import smtplib
from email.mime.text import MIMEText

def get_system_info():
"""获取系统信息"""
info = {
'timestamp': datetime.now().isoformat(),
'hostname': psutil.hostname(),
'cpu': {
'percent': psutil.cpu_percent(interval=1),
'cores': psutil.cpu_count(),
'freq': psutil.cpu_freq().current if psutil.cpu_freq() else None
},
'memory': {
'total': psutil.virtual_memory().total,
'available': psutil.virtual_memory().available,
'percent': psutil.virtual_memory().percent
},
'disk': [],
'network': {
'bytes_sent': psutil.net_io_counters().bytes_sent,
'bytes_recv': psutil.net_io_counters().bytes_recv
}
}

# 磁盘信息
for partition in psutil.disk_partitions():
try:
usage = psutil.disk_usage(partition.mountpoint)
info['disk'].append({
'device': partition.device,
'mountpoint': partition.mountpoint,
'percent': usage.percent
})
except PermissionError:
pass

return info

def check_thresholds(info, thresholds=None):
"""检查阈值"""
if thresholds is None:
thresholds = {
'cpu_percent': 80,
'memory_percent': 80,
'disk_percent': 90
}

alerts = []

if info['cpu']['percent'] > thresholds['cpu_percent']:
alerts.append(f"CPU 使用率过高:{info['cpu']['percent']}%")

if info['memory']['percent'] > thresholds['memory_percent']:
alerts.append(f"内存使用率过高:{info['memory']['percent']}%")

for disk in info['disk']:
if disk['percent'] > thresholds['disk_percent']:
alerts.append(f"磁盘 {disk['mountpoint']} 使用率过高:{disk['percent']}%")

return alerts

def send_alert(email_to, alerts):
"""发送告警邮件"""
msg = MIMEText('\n'.join(alerts))
msg['Subject'] = '服务器监控告警'
msg['From'] = 'monitor@example.com'
msg['To'] = email_to

# 配置 SMTP 服务器
# with smtplib.SMTP('smtp.example.com') as server:
# server.login('user', 'password')
# server.send_message(msg)

print("Alert sent:", alerts)

def monitor_loop(interval=60):
"""持续监控"""
while True:
info = get_system_info()
alerts = check_thresholds(info)

if alerts:
send_alert('admin@example.com', alerts)

# 保存监控数据
with open(f"monitor_{datetime.now().strftime('%Y%m%d')}.json", 'a') as f:
f.write(json.dumps(info) + '\n')

time.sleep(interval)

if __name__ == "__main__":
# 单次检查
info = get_system_info()
print(json.dumps(info, indent=2))

alerts = check_thresholds(info)
if alerts:
print("\n⚠️ Alerts:")
for alert in alerts:
print(f" - {alert}")

# 持续监控
# monitor_loop(interval=60)

9. 日志分析脚本

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
#!/usr/bin/env python3
"""
日志分析脚本
用途:分析日志文件,提取错误、统计访问等
"""

import re
from collections import Counter
from datetime import datetime
from pathlib import Path

def analyze_access_log(log_file):
"""分析 Nginx/Apache 访问日志"""
pattern = r'(\d+\.\d+\.\d+\.\d+).*?"(\w+) (.*?) HTTP.*?" (\d+) (\d+)'

ips = Counter()
urls = Counter()
status_codes = Counter()
methods = Counter()
total_bytes = 0

with open(log_file, 'r') as f:
for line in f:
match = re.search(pattern, line)
if match:
ip, method, url, status, bytes_sent = match.groups()
ips[ip] += 1
urls[url] += 1
status_codes[status] += 1
methods[method] += 1
total_bytes += int(bytes_sent)

return {
'total_requests': sum(ips.values()),
'unique_ips': len(ips),
'top_ips': ips.most_common(10),
'top_urls': urls.most_common(10),
'status_codes': dict(status_codes),
'methods': dict(methods),
'total_bytes': total_bytes
}

def analyze_error_log(log_file):
"""分析错误日志"""
errors = Counter()
error_lines = []

with open(log_file, 'r') as f:
for line in f:
if 'ERROR' in line or 'CRITICAL' in line:
errors[line.split('ERROR')[1].split(':')[0].strip()] += 1
error_lines.append(line.strip())

return {
'total_errors': len(error_lines),
'error_types': dict(errors.most_common(10)),
'recent_errors': error_lines[-20:]
}

def generate_report(analysis, output_file):
"""生成分析报告"""
with open(output_file, 'w') as f:
f.write(f"# 日志分析报告\n")
f.write(f"生成时间:{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n\n")

f.write(f"## 访问统计\n")
f.write(f"- 总请求数:{analysis['total_requests']}\n")
f.write(f"- 独立 IP 数:{analysis['unique_ips']}\n")
f.write(f"- 总流量:{analysis['total_bytes'] / 1024 / 1024:.2f} MB\n\n")

f.write(f"## Top 10 IP\n")
for ip, count in analysis['top_ips']:
f.write(f"- {ip}: {count} 次\n")

f.write(f"\n## 状态码分布\n")
for code, count in analysis['status_codes'].items():
f.write(f"- {code}: {count} 次\n")

if __name__ == "__main__":
# 分析访问日志
access_analysis = analyze_access_log('/var/log/nginx/access.log')
generate_report(access_analysis, 'access_report.md')

# 分析错误日志
error_analysis = analyze_error_log('/var/log/nginx/error.log')
print(f"Total errors: {error_analysis['total_errors']}")

10. 自动备份脚本

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
#!/usr/bin/env python3
"""
自动备份脚本
用途:备份文件、数据库,支持压缩、加密、上传
"""

import os
import subprocess
import shutil
from pathlib import Path
from datetime import datetime
import boto3 # AWS S3

class BackupManager:
def __init__(self, backup_dir, retention_days=30):
self.backup_dir = Path(backup_dir)
self.backup_dir.mkdir(parents=True, exist_ok=True)
self.retention_days = retention_days

def backup_directory(self, source_dir, compress=True):
"""备份目录"""
source = Path(source_dir)
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
backup_name = f"{source.name}_{timestamp}"

if compress:
backup_file = self.backup_dir / f"{backup_name}.tar.gz"
subprocess.run(['tar', '-czf', str(backup_file), '-C', str(source.parent), source.name])
else:
backup_file = self.backup_dir / backup_name
shutil.copytree(source, backup_file)

print(f"Backed up: {source} -> {backup_file}")
return backup_file

def backup_mysql(self, database, user, password, host='localhost'):
"""备份 MySQL 数据库"""
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
backup_file = self.backup_dir / f"{database}_{timestamp}.sql.gz"

cmd = f"mysqldump -u{user} -p{password} -h{host} {database} | gzip > {backup_file}"
subprocess.run(cmd, shell=True)

print(f"Backed up MySQL: {database} -> {backup_file}")
return backup_file

def upload_to_s3(self, file_path, bucket, prefix='backups'):
"""上传到 S3"""
s3 = boto3.client('s3')
key = f"{prefix}/{Path(file_path).name}"

s3.upload_file(str(file_path), bucket, key)
print(f"Uploaded to S3: s3://{bucket}/{key}")

def cleanup_old_backups(self):
"""清理旧备份"""
cutoff = datetime.now().timestamp() - (self.retention_days * 86400)

for file in self.backup_dir.iterdir():
if file.stat().st_mtime < cutoff:
file.unlink()
print(f"Deleted old backup: {file}")

if __name__ == "__main__":
backup_mgr = BackupManager('/backup', retention_days=30)

# 备份目录
backup_mgr.backup_directory('/var/www/html')

# 备份数据库
backup_mgr.backup_mysql('mydb', 'root', 'password')

# 上传到 S3
# for file in backup_mgr.backup_dir.glob('*.tar.gz'):
# backup_mgr.upload_to_s3(file, 'my-backup-bucket')

# 清理旧备份
backup_mgr.cleanup_old_backups()

五、数据处理自动化

11. CSV 数据清洗

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
#!/usr/bin/env python3
"""
CSV 数据清洗
用途:清洗 CSV 数据,处理缺失值、重复值、格式转换
"""

import pandas as pd
import numpy as np

def clean_csv(input_file, output_file):
"""清洗 CSV 文件"""
df = pd.read_csv(input_file)

print(f"Original shape: {df.shape}")

# 删除完全空的行
df = df.dropna(how='all')

# 删除重复行
duplicates = df.duplicated().sum()
df = df.drop_duplicates()
print(f"Removed {duplicates} duplicate rows")

# 处理缺失值
for col in df.columns:
missing = df[col].isna().sum()
if missing > 0:
if df[col].dtype in ['int64', 'float64']:
df[col] = df[col].fillna(df[col].median())
else:
df[col] = df[col].fillna('Unknown')
print(f"Filled {missing} missing values in {col}")

# 格式转换
date_cols = [col for col in df.columns if 'date' in col.lower() or 'time' in col.lower()]
for col in date_cols:
df[col] = pd.to_datetime(df[col], errors='coerce')

# 删除空白列
df = df.loc[:, (df != '').any(axis=0)]

print(f"Cleaned shape: {df.shape}")
df.to_csv(output_file, index=False)
print(f"Saved to {output_file}")

if __name__ == "__main__":
clean_csv('raw_data.csv', 'clean_data.csv')

12. JSON 数据转换

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
#!/usr/bin/env python3
"""
JSON 数据转换
用途:JSON 与 CSV、Excel 互转,扁平化嵌套 JSON
"""

import json
import pandas as pd
from pathlib import Path

def json_to_csv(json_file, csv_file):
"""JSON 转 CSV"""
with open(json_file, 'r') as f:
data = json.load(f)

if isinstance(data, list):
df = pd.DataFrame(data)
else:
df = pd.DataFrame([data])

df.to_csv(csv_file, index=False)
print(f"Converted {json_file} to {csv_file}")

def json_to_excel(json_file, excel_file):
"""JSON 转 Excel"""
with open(json_file, 'r') as f:
data = json.load(f)

if isinstance(data, list):
df = pd.DataFrame(data)
else:
df = pd.DataFrame([data])

df.to_excel(excel_file, index=False)
print(f"Converted {json_file} to {excel_file}")

def flatten_json(nested_json):
"""扁平化嵌套 JSON"""
def flatten(obj, parent_key='', sep='.'):
items = []
if isinstance(obj, dict):
for k, v in obj.items():
new_key = f"{parent_key}{sep}{k}" if parent_key else k
items.extend(flatten(v, new_key, sep=sep).items())
elif isinstance(obj, list):
for i, v in enumerate(obj):
new_key = f"{parent_key}{sep}{i}" if parent_key else str(i)
items.extend(flatten(v, new_key, sep=sep).items())
else:
items.append((parent_key, obj))
return dict(items)

if isinstance(nested_json, list):
return [flatten(item) for item in nested_json]
else:
return flatten(nested_json)

if __name__ == "__main__":
# JSON 转 CSV
json_to_csv('data.json', 'data.csv')

# 扁平化嵌套 JSON
with open('nested.json', 'r') as f:
nested = json.load(f)

flat = flatten_json(nested)

with open('flat.json', 'w') as f:
json.dump(flat, f, indent=2)

13. 数据可视化生成

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
#!/usr/bin/env python3
"""
数据可视化生成
用途:自动生成图表并保存
"""

import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from pathlib import Path

def generate_charts(data_file, output_dir):
"""生成可视化图表"""
Path(output_dir).mkdir(parents=True, exist_ok=True)
df = pd.read_csv(data_file)

# 设置样式
sns.set_style("whitegrid")

# 1. 数值分布直方图
numeric_cols = df.select_dtypes(include=['number']).columns
for col in numeric_cols[:5]: # 最多 5 个
plt.figure(figsize=(10, 6))
sns.histplot(df[col], kde=True)
plt.title(f'{col} Distribution')
plt.savefig(f'{output_dir}/{col}_hist.png', dpi=300, bbox_inches='tight')
plt.close()

# 2. 相关性热图
if len(numeric_cols) > 1:
plt.figure(figsize=(12, 10))
corr = df[numeric_cols].corr()
sns.heatmap(corr, annot=True, cmap='coolwarm', center=0)
plt.title('Correlation Heatmap')
plt.savefig(f'{output_dir}/correlation.png', dpi=300, bbox_inches='tight')
plt.close()

# 3. 分类数据柱状图
categorical_cols = df.select_dtypes(include=['object']).columns
for col in categorical_cols[:3]: # 最多 3 个
plt.figure(figsize=(10, 6))
df[col].value_counts().head(10).plot(kind='bar')
plt.title(f'Top 10 {col}')
plt.xticks(rotation=45)
plt.savefig(f'{output_dir}/{col}_bar.png', dpi=300, bbox_inches='tight')
plt.close()

print(f"Generated charts in {output_dir}")

if __name__ == "__main__":
generate_charts('data.csv', './charts')

六、实用工具脚本

14. 密码生成器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
#!/usr/bin/env python3
"""
密码生成器
用途:生成高强度随机密码
"""

import secrets
import string

def generate_password(length=16, use_special=True):
"""生成随机密码"""
characters = string.ascii_letters + string.digits
if use_special:
characters += string.punctuation

# 确保包含各类字符
password = [
secrets.choice(string.ascii_lowercase),
secrets.choice(string.ascii_uppercase),
secrets.choice(string.digits),
]

if use_special:
password.append(secrets.choice(string.punctuation))
password += [secrets.choice(characters) for _ in range(length - 4)]
else:
password += [secrets.choice(characters) for _ in range(length - 3)]

# 打乱顺序
secrets.SystemRandom().shuffle(password)

return ''.join(password)

def generate_passwords(count=10, length=16):
"""批量生成密码"""
passwords = [generate_password(length) for _ in range(count)]

for i, pwd in enumerate(passwords, 1):
print(f"{i}. {pwd}")

return passwords

if __name__ == "__main__":
# 生成单个密码
print(f"Generated password: {generate_password()}")

# 批量生成
generate_passwords(count=5)

15. QR 码生成器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
#!/usr/bin/env python3
"""
QR 码生成器
用途:生成二维码图片
"""

import qrcode
from pathlib import Path

def generate_qr(data, output_file, size=10):
"""生成 QR 码"""
qr = qrcode.QRCode(
version=1,
error_correction=qrcode.constants.ERROR_CORRECT_L,
box_size=size,
border=4,
)
qr.add_data(data)
qr.make(fit=True)

img = qr.make_image(fill_color="black", back_color="white")
img.save(output_file)
print(f"Generated QR code: {output_file}")

def generate_qr_batch(data_list, output_dir):
"""批量生成 QR 码"""
Path(output_dir).mkdir(parents=True, exist_ok=True)

for i, data in enumerate(data_list, 1):
output_file = f"{output_dir}/qr_{i:03d}.png"
generate_qr(data, output_file)

if __name__ == "__main__":
# 生成单个 QR 码
generate_qr('https://blog.sharezone.cn', 'website_qr.png')

# 批量生成
urls = [
'https://google.com',
'https://github.com',
'https://python.org'
]
generate_qr_batch(urls, './qrcodes')

16. Markdown 转 HTML

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
#!/usr/bin/env python3
"""
Markdown 转 HTML
用途:批量转换 Markdown 文件为 HTML
"""

import markdown
from pathlib import Path

def convert_md_to_html(md_file, html_file=None):
"""转换单个文件"""
if html_file is None:
html_file = md_file.replace('.md', '.html')

with open(md_file, 'r', encoding='utf-8') as f:
md_content = f.read()

# 转换 Markdown
html_content = markdown.markdown(
md_content,
extensions=['extra', 'codehilite', 'toc']
)

# 添加 HTML 模板
html_template = f"""<!DOCTYPE html>
<html>
<head>
<meta charset="UTF-8">
<title>{Path(md_file).stem}</title>
<style>
body {{ font-family: Arial, sans-serif; max-width: 800px; margin: 0 auto; padding: 20px; }}
code {{ background: #f4f4f4; padding: 2px 5px; }}
pre {{ background: #f4f4f4; padding: 10px; overflow-x: auto; }}
</style>
</head>
<body>
{html_content}
</body>
</html>"""

with open(html_file, 'w', encoding='utf-8') as f:
f.write(html_template)

print(f"Converted: {md_file} -> {html_file}")

def batch_convert(input_dir, output_dir=None):
"""批量转换"""
if output_dir is None:
output_dir = input_dir

Path(output_dir).mkdir(parents=True, exist_ok=True)

md_files = list(Path(input_dir).glob('*.md'))
for md_file in md_files:
html_file = Path(output_dir) / f"{md_file.stem}.html"
convert_md_to_html(str(md_file), str(html_file))

if __name__ == "__main__":
# 转换单个文件
convert_md_to_html('article.md')

# 批量转换
batch_convert('./posts', './html')

七、总结

本文介绍了 20 个实用的 Python 自动化脚本,涵盖:

文件处理:批量重命名、图片转换、PDF 处理、Excel 操作
网络请求:网站监控、API 调用、网页抓取
系统运维:服务器监控、日志分析、自动备份
数据处理:CSV 清洗、JSON 转换、数据可视化
实用工具:密码生成、QR 码、Markdown 转换

所有脚本都经过测试,可直接使用或根据需求修改。


最后更新: 2026-03-12

标签: #Python #自动化 #脚本 #效率工具 #运维

分类: 编程/Python

依赖安装:

1
pip install pillow pypdf2 pandas openpyxl requests beautifulsoup4 psutil boto3 matplotlib seaborn qrcode markdown