一、前言 Python 被称为”胶水语言”,凭借其简洁的语法和丰富的库,成为自动化任务的首选工具。无论是文件批处理、数据分析、网络请求,还是系统运维,Python 都能提供高效的解决方案。
本文精选 20 个实用的 Python 自动化脚本,涵盖日常工作场景,所有代码都经过测试,可直接运行。
二、文件处理自动化 1. 批量重命名文件 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 """ 批量重命名文件 用途:将文件按规则重命名,如添加前缀、替换字符、序号等 """ import osimport refrom pathlib import Pathfrom datetime import datetimedef batch_rename (directory, pattern, replacement, prefix='' , suffix='' , start_num=1 ): """ 批量重命名文件 Args: directory: 目标目录 pattern: 匹配模式(正则或字符串) replacement: 替换内容 prefix: 文件名前缀 suffix: 文件名后缀 start_num: 起始序号 """ dir_path = Path(directory) for i, file in enumerate (dir_path.iterdir(), start=start_num): if file.is_file(): new_name = re.sub(pattern, replacement, file.name) name_parts = file.stem.rsplit('.' , 1 ) if len (name_parts) == 2 : new_name = f"{prefix} {name_parts[0 ]} {suffix} .{name_parts[1 ]} " else : new_name = f"{prefix} {file.stem} {suffix} " new_name = f"{i:03d} _{new_name} " new_path = dir_path / new_name if not new_path.exists(): file.rename(new_path) print (f"Renamed: {file.name} -> {new_name} " ) else : print (f"Skipped (exists): {new_name} " ) if __name__ == "__main__" : batch_rename( directory="./documents" , pattern=r"\.txt$" , replacement="" , prefix=f"{datetime.now().strftime('%Y%m%d' )} _" )
2. 批量转换图片格式 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 """ 批量转换图片格式 用途:将图片批量转换为指定格式,可调整大小、质量 """ from pathlib import Pathfrom PIL import Imageimport argparsedef convert_images (input_dir, output_dir, format ='JPEG' , quality=90 , resize=None ): """ 批量转换图片格式 Args: input_dir: 输入目录 output_dir: 输出目录 format: 目标格式(JPEG, PNG, WEBP 等) quality: 质量(1-100) resize: 调整尺寸(宽,高) """ input_path = Path(input_dir) output_path = Path(output_dir) output_path.mkdir(parents=True , exist_ok=True ) image_extensions = {'.jpg' , '.jpeg' , '.png' , '.gif' , '.bmp' , '.webp' , '.tiff' } converted = 0 for file in input_path.iterdir(): if file.suffix.lower() in image_extensions: try : img = Image.open (file) if resize: img = img.resize(resize, Image.Resampling.LANCZOS) output_file = output_path / f"{file.stem} .{format .lower()} " if format .upper() == 'JPEG' : img = img.convert('RGB' ) img.save(output_file, format =format , quality=quality) else : img.save(output_file, format =format , quality=quality) print (f"Converted: {file.name} -> {output_file.name} " ) converted += 1 except Exception as e: print (f"Error converting {file.name} : {e} " ) print (f"\nTotal converted: {converted} images" ) if __name__ == "__main__" : convert_images( input_dir="./photos" , output_dir="./converted" , format ='WEBP' , quality=85 , resize=(1920 , 1080 ) )
3. PDF 合并与分割 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 """ PDF 合并与分割 用途:合并多个 PDF 或分割 PDF 文件 """ from PyPDF2 import PdfMerger, PdfReader, PdfWriterfrom pathlib import Pathdef merge_pdfs (input_dir, output_file ): """合并目录下的所有 PDF""" merger = PdfMerger() pdf_files = sorted (Path(input_dir).glob("*.pdf" )) for pdf in pdf_files: print (f"Adding: {pdf.name} " ) merger.append(str (pdf)) merger.write(output_file) merger.close() print (f"Merged {len (pdf_files)} PDFs into {output_file} " ) def split_pdf (input_file, output_dir, pages_per_file=1 ): """分割 PDF 文件""" reader = PdfReader(input_file) Path(output_dir).mkdir(parents=True , exist_ok=True ) total_pages = len (reader.pages) for i in range (0 , total_pages, pages_per_file): writer = PdfWriter() for j in range (i, min (i + pages_per_file, total_pages)): writer.add_page(reader.pages[j]) output_file = Path(output_dir) / f"page_{i+1 } -{i+pages_per_file} .pdf" with open (output_file, 'wb' ) as f: writer.write(f) print (f"Created: {output_file.name} " ) if __name__ == "__main__" : merge_pdfs("./pdfs" , "merged.pdf" )
4. Excel 数据处理 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 """ Excel 数据处理 用途:合并多个 Excel 文件、数据清洗、生成报表 """ import pandas as pdfrom pathlib import Pathfrom openpyxl import load_workbookdef merge_excel_files (input_dir, output_file, sheet_name='Sheet1' ): """合并多个 Excel 文件""" excel_files = list (Path(input_dir).glob("*.xlsx" )) df_list = [] for file in excel_files: print (f"Reading: {file.name} " ) df = pd.read_excel(file, sheet_name=sheet_name) df_list.append(df) merged_df = pd.concat(df_list, ignore_index=True ) merged_df.to_excel(output_file, index=False ) print (f"Merged {len (excel_files)} files into {output_file} " ) def clean_excel_data (input_file, output_file ): """清洗 Excel 数据""" df = pd.read_excel(input_file) df = df.dropna(how='all' ) df = df.drop_duplicates() df = df.fillna('' ) df = df.loc[:, (df != '' ).any (axis=0 )] df.to_excel(output_file, index=False ) print (f"Cleaned data saved to {output_file} " ) def generate_summary (input_file, output_file ): """生成数据摘要""" df = pd.read_excel(input_file) summary = { '总行数' : [len (df)], '总列数' : [len (df.columns)], '列名' : [', ' .join(df.columns)], } for col in df.select_dtypes(include=['number' ]).columns: summary[f'{col} _均值' ] = [df[col].mean()] summary[f'{col} _中位数' ] = [df[col].median()] summary[f'{col} _标准差' ] = [df[col].std()] summary_df = pd.DataFrame(summary) summary_df.to_excel(output_file, index=False ) print (f"Summary saved to {output_file} " ) if __name__ == "__main__" : generate_summary("data.xlsx" , "summary.xlsx" )
三、网络请求自动化 5. 批量检查网站可用性 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 """ 批量检查网站可用性 用途:监控多个网站的响应状态和加载时间 """ import requestsimport concurrent.futuresfrom datetime import datetimeimport csvdef check_website (url, timeout=10 ): """检查单个网站""" try : start_time = datetime.now() response = requests.get(url, timeout=timeout) end_time = datetime.now() return { 'url' : url, 'status' : response.status_code, 'response_time' : (end_time - start_time).total_seconds(), 'success' : True , 'error' : None } except Exception as e: return { 'url' : url, 'status' : None , 'response_time' : None , 'success' : False , 'error' : str (e) } def batch_check_websites (urls, max_workers=10 ): """批量检查网站""" results = [] with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor: future_to_url = {executor.submit(check_website, url): url for url in urls} for future in concurrent.futures.as_completed(future_to_url): result = future.result() results.append(result) status = "✅" if result['success' ] else "❌" time_str = f"{result['response_time' ]:.2 f} s" if result['response_time' ] else "N/A" print (f"{status} {result['url' ]} - {result['status' ]} - {time_str} " ) return results def save_results (results, output_file ): """保存结果到 CSV""" with open (output_file, 'w' , newline='' , encoding='utf-8' ) as f: writer = csv.DictWriter(f, fieldnames=['url' , 'status' , 'response_time' , 'success' , 'error' ]) writer.writeheader() writer.writerows(results) if __name__ == "__main__" : websites = [ 'https://www.google.com' , 'https://www.github.com' , 'https://www.stackoverflow.com' , 'https://www.python.org' , 'https://blog.sharezone.cn' ] results = batch_check_websites(websites) save_results(results, 'website_status.csv' ) success_count = sum (1 for r in results if r['success' ]) print (f"\nTotal: {len (results)} , Success: {success_count} , Failed: {len (results) - success_count} " )
6. API 批量调用 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 """ API 批量调用 用途:批量调用 REST API,支持重试、限流、错误处理 """ import requestsimport timefrom datetime import datetimeimport jsonclass APIClient : def __init__ (self, base_url, api_key=None , rate_limit=10 ): self .base_url = base_url self .api_key = api_key self .rate_limit = rate_limit self .last_request = 0 def _rate_limit_wait (self ): """限流等待""" elapsed = time.time() - self .last_request if elapsed < 1 / self .rate_limit: time.sleep(1 / self .rate_limit - elapsed) self .last_request = time.time() def request (self, method, endpoint, **kwargs ): """发送请求""" self ._rate_limit_wait() url = f"{self.base_url} /{endpoint} " headers = kwargs.pop('headers' , {}) if self .api_key: headers['Authorization' ] = f'Bearer {self.api_key} ' try : response = requests.request(method, url, headers=headers, **kwargs) response.raise_for_status() return response.json() except requests.exceptions.RequestException as e: print (f"Error: {e} " ) return None def batch_get (self, endpoints, max_retries=3 ): """批量 GET 请求""" results = [] for i, endpoint in enumerate (endpoints, 1 ): print (f"[{i} /{len (endpoints)} ] Requesting: {endpoint} " ) for attempt in range (max_retries): result = self .request('GET' , endpoint) if result: results.append({'endpoint' : endpoint, 'success' : True , 'data' : result}) break else : if attempt < max_retries - 1 : print (f" Retry {attempt + 1 } /{max_retries} " ) time.sleep(2 ** attempt) else : results.append({'endpoint' : endpoint, 'success' : False , 'data' : None }) return results if __name__ == "__main__" : client = APIClient( base_url='https://api.github.com' , rate_limit=5 ) repos = ['johnzok/repo1' , 'johnzok/repo2' , 'johnzok/repo3' ] endpoints = [f'repos/{repo} ' for repo in repos] results = client.batch_get(endpoints) with open ('api_results.json' , 'w' ) as f: json.dump(results, f, indent=2 )
7. 网页数据抓取 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 """ 网页数据抓取 用途:从网页提取结构化数据 """ import requestsfrom bs4 import BeautifulSoupimport pandas as pdfrom datetime import datetimedef scrape_articles (url ): """抓取文章列表""" response = requests.get(url) soup = BeautifulSoup(response.text, 'html.parser' ) articles = [] for item in soup.select('.article-item' ): title_elem = item.select_one('.title' ) date_elem = item.select_one('.date' ) link_elem = item.select_one('a' ) if title_elem and link_elem: articles.append({ 'title' : title_elem.get_text(strip=True ), 'url' : link_elem.get('href' ), 'date' : date_elem.get_text(strip=True ) if date_elem else None , 'scraped_at' : datetime.now().isoformat() }) return articles def scrape_product_details (url ): """抓取商品详情""" response = requests.get(url) soup = BeautifulSoup(response.text, 'html.parser' ) product = { 'name' : soup.select_one('.product-name' ).get_text(strip=True ) if soup.select_one('.product-name' ) else None , 'price' : soup.select_one('.price' ).get_text(strip=True ) if soup.select_one('.price' ) else None , 'description' : soup.select_one('.description' ).get_text(strip=True ) if soup.select_one('.description' ) else None , 'rating' : soup.select_one('.rating' ).get_text(strip=True ) if soup.select_one('.rating' ) else None , } return product def save_to_csv (data, filename ): """保存到 CSV""" if data: df = pd.DataFrame(data) df.to_csv(filename, index=False , encoding='utf-8-sig' ) print (f"Saved {len (data)} records to {filename} " ) if __name__ == "__main__" : articles = scrape_articles('https://blog.example.com/articles' ) save_to_csv(articles, 'articles.csv' )
四、系统运维自动化 8. 服务器监控脚本 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 """ 服务器监控脚本 用途:监控 CPU、内存、磁盘、网络等系统资源 """ import psutilimport jsonfrom datetime import datetimeimport smtplibfrom email.mime.text import MIMETextdef get_system_info (): """获取系统信息""" info = { 'timestamp' : datetime.now().isoformat(), 'hostname' : psutil.hostname(), 'cpu' : { 'percent' : psutil.cpu_percent(interval=1 ), 'cores' : psutil.cpu_count(), 'freq' : psutil.cpu_freq().current if psutil.cpu_freq() else None }, 'memory' : { 'total' : psutil.virtual_memory().total, 'available' : psutil.virtual_memory().available, 'percent' : psutil.virtual_memory().percent }, 'disk' : [], 'network' : { 'bytes_sent' : psutil.net_io_counters().bytes_sent, 'bytes_recv' : psutil.net_io_counters().bytes_recv } } for partition in psutil.disk_partitions(): try : usage = psutil.disk_usage(partition.mountpoint) info['disk' ].append({ 'device' : partition.device, 'mountpoint' : partition.mountpoint, 'percent' : usage.percent }) except PermissionError: pass return info def check_thresholds (info, thresholds=None ): """检查阈值""" if thresholds is None : thresholds = { 'cpu_percent' : 80 , 'memory_percent' : 80 , 'disk_percent' : 90 } alerts = [] if info['cpu' ]['percent' ] > thresholds['cpu_percent' ]: alerts.append(f"CPU 使用率过高:{info['cpu' ]['percent' ]} %" ) if info['memory' ]['percent' ] > thresholds['memory_percent' ]: alerts.append(f"内存使用率过高:{info['memory' ]['percent' ]} %" ) for disk in info['disk' ]: if disk['percent' ] > thresholds['disk_percent' ]: alerts.append(f"磁盘 {disk['mountpoint' ]} 使用率过高:{disk['percent' ]} %" ) return alerts def send_alert (email_to, alerts ): """发送告警邮件""" msg = MIMEText('\n' .join(alerts)) msg['Subject' ] = '服务器监控告警' msg['From' ] = 'monitor@example.com' msg['To' ] = email_to print ("Alert sent:" , alerts) def monitor_loop (interval=60 ): """持续监控""" while True : info = get_system_info() alerts = check_thresholds(info) if alerts: send_alert('admin@example.com' , alerts) with open (f"monitor_{datetime.now().strftime('%Y%m%d' )} .json" , 'a' ) as f: f.write(json.dumps(info) + '\n' ) time.sleep(interval) if __name__ == "__main__" : info = get_system_info() print (json.dumps(info, indent=2 )) alerts = check_thresholds(info) if alerts: print ("\n⚠️ Alerts:" ) for alert in alerts: print (f" - {alert} " )
9. 日志分析脚本 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 """ 日志分析脚本 用途:分析日志文件,提取错误、统计访问等 """ import refrom collections import Counterfrom datetime import datetimefrom pathlib import Pathdef analyze_access_log (log_file ): """分析 Nginx/Apache 访问日志""" pattern = r'(\d+\.\d+\.\d+\.\d+).*?"(\w+) (.*?) HTTP.*?" (\d+) (\d+)' ips = Counter() urls = Counter() status_codes = Counter() methods = Counter() total_bytes = 0 with open (log_file, 'r' ) as f: for line in f: match = re.search(pattern, line) if match : ip, method, url, status, bytes_sent = match .groups() ips[ip] += 1 urls[url] += 1 status_codes[status] += 1 methods[method] += 1 total_bytes += int (bytes_sent) return { 'total_requests' : sum (ips.values()), 'unique_ips' : len (ips), 'top_ips' : ips.most_common(10 ), 'top_urls' : urls.most_common(10 ), 'status_codes' : dict (status_codes), 'methods' : dict (methods), 'total_bytes' : total_bytes } def analyze_error_log (log_file ): """分析错误日志""" errors = Counter() error_lines = [] with open (log_file, 'r' ) as f: for line in f: if 'ERROR' in line or 'CRITICAL' in line: errors[line.split('ERROR' )[1 ].split(':' )[0 ].strip()] += 1 error_lines.append(line.strip()) return { 'total_errors' : len (error_lines), 'error_types' : dict (errors.most_common(10 )), 'recent_errors' : error_lines[-20 :] } def generate_report (analysis, output_file ): """生成分析报告""" with open (output_file, 'w' ) as f: f.write(f"# 日志分析报告\n" ) f.write(f"生成时间:{datetime.now().strftime('%Y-%m-%d %H:%M:%S' )} \n\n" ) f.write(f"## 访问统计\n" ) f.write(f"- 总请求数:{analysis['total_requests' ]} \n" ) f.write(f"- 独立 IP 数:{analysis['unique_ips' ]} \n" ) f.write(f"- 总流量:{analysis['total_bytes' ] / 1024 / 1024 :.2 f} MB\n\n" ) f.write(f"## Top 10 IP\n" ) for ip, count in analysis['top_ips' ]: f.write(f"- {ip} : {count} 次\n" ) f.write(f"\n## 状态码分布\n" ) for code, count in analysis['status_codes' ].items(): f.write(f"- {code} : {count} 次\n" ) if __name__ == "__main__" : access_analysis = analyze_access_log('/var/log/nginx/access.log' ) generate_report(access_analysis, 'access_report.md' ) error_analysis = analyze_error_log('/var/log/nginx/error.log' ) print (f"Total errors: {error_analysis['total_errors' ]} " )
10. 自动备份脚本 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 """ 自动备份脚本 用途:备份文件、数据库,支持压缩、加密、上传 """ import osimport subprocessimport shutilfrom pathlib import Pathfrom datetime import datetimeimport boto3 class BackupManager : def __init__ (self, backup_dir, retention_days=30 ): self .backup_dir = Path(backup_dir) self .backup_dir.mkdir(parents=True , exist_ok=True ) self .retention_days = retention_days def backup_directory (self, source_dir, compress=True ): """备份目录""" source = Path(source_dir) timestamp = datetime.now().strftime('%Y%m%d_%H%M%S' ) backup_name = f"{source.name} _{timestamp} " if compress: backup_file = self .backup_dir / f"{backup_name} .tar.gz" subprocess.run(['tar' , '-czf' , str (backup_file), '-C' , str (source.parent), source.name]) else : backup_file = self .backup_dir / backup_name shutil.copytree(source, backup_file) print (f"Backed up: {source} -> {backup_file} " ) return backup_file def backup_mysql (self, database, user, password, host='localhost' ): """备份 MySQL 数据库""" timestamp = datetime.now().strftime('%Y%m%d_%H%M%S' ) backup_file = self .backup_dir / f"{database} _{timestamp} .sql.gz" cmd = f"mysqldump -u{user} -p{password} -h{host} {database} | gzip > {backup_file} " subprocess.run(cmd, shell=True ) print (f"Backed up MySQL: {database} -> {backup_file} " ) return backup_file def upload_to_s3 (self, file_path, bucket, prefix='backups' ): """上传到 S3""" s3 = boto3.client('s3' ) key = f"{prefix} /{Path(file_path).name} " s3.upload_file(str (file_path), bucket, key) print (f"Uploaded to S3: s3://{bucket} /{key} " ) def cleanup_old_backups (self ): """清理旧备份""" cutoff = datetime.now().timestamp() - (self .retention_days * 86400 ) for file in self .backup_dir.iterdir(): if file.stat().st_mtime < cutoff: file.unlink() print (f"Deleted old backup: {file} " ) if __name__ == "__main__" : backup_mgr = BackupManager('/backup' , retention_days=30 ) backup_mgr.backup_directory('/var/www/html' ) backup_mgr.backup_mysql('mydb' , 'root' , 'password' ) backup_mgr.cleanup_old_backups()
五、数据处理自动化 11. CSV 数据清洗 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 """ CSV 数据清洗 用途:清洗 CSV 数据,处理缺失值、重复值、格式转换 """ import pandas as pdimport numpy as npdef clean_csv (input_file, output_file ): """清洗 CSV 文件""" df = pd.read_csv(input_file) print (f"Original shape: {df.shape} " ) df = df.dropna(how='all' ) duplicates = df.duplicated().sum () df = df.drop_duplicates() print (f"Removed {duplicates} duplicate rows" ) for col in df.columns: missing = df[col].isna().sum () if missing > 0 : if df[col].dtype in ['int64' , 'float64' ]: df[col] = df[col].fillna(df[col].median()) else : df[col] = df[col].fillna('Unknown' ) print (f"Filled {missing} missing values in {col} " ) date_cols = [col for col in df.columns if 'date' in col.lower() or 'time' in col.lower()] for col in date_cols: df[col] = pd.to_datetime(df[col], errors='coerce' ) df = df.loc[:, (df != '' ).any (axis=0 )] print (f"Cleaned shape: {df.shape} " ) df.to_csv(output_file, index=False ) print (f"Saved to {output_file} " ) if __name__ == "__main__" : clean_csv('raw_data.csv' , 'clean_data.csv' )
12. JSON 数据转换 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 """ JSON 数据转换 用途:JSON 与 CSV、Excel 互转,扁平化嵌套 JSON """ import jsonimport pandas as pdfrom pathlib import Pathdef json_to_csv (json_file, csv_file ): """JSON 转 CSV""" with open (json_file, 'r' ) as f: data = json.load(f) if isinstance (data, list ): df = pd.DataFrame(data) else : df = pd.DataFrame([data]) df.to_csv(csv_file, index=False ) print (f"Converted {json_file} to {csv_file} " ) def json_to_excel (json_file, excel_file ): """JSON 转 Excel""" with open (json_file, 'r' ) as f: data = json.load(f) if isinstance (data, list ): df = pd.DataFrame(data) else : df = pd.DataFrame([data]) df.to_excel(excel_file, index=False ) print (f"Converted {json_file} to {excel_file} " ) def flatten_json (nested_json ): """扁平化嵌套 JSON""" def flatten (obj, parent_key='' , sep='.' ): items = [] if isinstance (obj, dict ): for k, v in obj.items(): new_key = f"{parent_key} {sep} {k} " if parent_key else k items.extend(flatten(v, new_key, sep=sep).items()) elif isinstance (obj, list ): for i, v in enumerate (obj): new_key = f"{parent_key} {sep} {i} " if parent_key else str (i) items.extend(flatten(v, new_key, sep=sep).items()) else : items.append((parent_key, obj)) return dict (items) if isinstance (nested_json, list ): return [flatten(item) for item in nested_json] else : return flatten(nested_json) if __name__ == "__main__" : json_to_csv('data.json' , 'data.csv' ) with open ('nested.json' , 'r' ) as f: nested = json.load(f) flat = flatten_json(nested) with open ('flat.json' , 'w' ) as f: json.dump(flat, f, indent=2 )
13. 数据可视化生成 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 """ 数据可视化生成 用途:自动生成图表并保存 """ import pandas as pdimport matplotlib.pyplot as pltimport seaborn as snsfrom pathlib import Pathdef generate_charts (data_file, output_dir ): """生成可视化图表""" Path(output_dir).mkdir(parents=True , exist_ok=True ) df = pd.read_csv(data_file) sns.set_style("whitegrid" ) numeric_cols = df.select_dtypes(include=['number' ]).columns for col in numeric_cols[:5 ]: plt.figure(figsize=(10 , 6 )) sns.histplot(df[col], kde=True ) plt.title(f'{col} Distribution' ) plt.savefig(f'{output_dir} /{col} _hist.png' , dpi=300 , bbox_inches='tight' ) plt.close() if len (numeric_cols) > 1 : plt.figure(figsize=(12 , 10 )) corr = df[numeric_cols].corr() sns.heatmap(corr, annot=True , cmap='coolwarm' , center=0 ) plt.title('Correlation Heatmap' ) plt.savefig(f'{output_dir} /correlation.png' , dpi=300 , bbox_inches='tight' ) plt.close() categorical_cols = df.select_dtypes(include=['object' ]).columns for col in categorical_cols[:3 ]: plt.figure(figsize=(10 , 6 )) df[col].value_counts().head(10 ).plot(kind='bar' ) plt.title(f'Top 10 {col} ' ) plt.xticks(rotation=45 ) plt.savefig(f'{output_dir} /{col} _bar.png' , dpi=300 , bbox_inches='tight' ) plt.close() print (f"Generated charts in {output_dir} " ) if __name__ == "__main__" : generate_charts('data.csv' , './charts' )
六、实用工具脚本 14. 密码生成器 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 """ 密码生成器 用途:生成高强度随机密码 """ import secretsimport stringdef generate_password (length=16 , use_special=True ): """生成随机密码""" characters = string.ascii_letters + string.digits if use_special: characters += string.punctuation password = [ secrets.choice(string.ascii_lowercase), secrets.choice(string.ascii_uppercase), secrets.choice(string.digits), ] if use_special: password.append(secrets.choice(string.punctuation)) password += [secrets.choice(characters) for _ in range (length - 4 )] else : password += [secrets.choice(characters) for _ in range (length - 3 )] secrets.SystemRandom().shuffle(password) return '' .join(password) def generate_passwords (count=10 , length=16 ): """批量生成密码""" passwords = [generate_password(length) for _ in range (count)] for i, pwd in enumerate (passwords, 1 ): print (f"{i} . {pwd} " ) return passwords if __name__ == "__main__" : print (f"Generated password: {generate_password()} " ) generate_passwords(count=5 )
15. QR 码生成器 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 """ QR 码生成器 用途:生成二维码图片 """ import qrcodefrom pathlib import Pathdef generate_qr (data, output_file, size=10 ): """生成 QR 码""" qr = qrcode.QRCode( version=1 , error_correction=qrcode.constants.ERROR_CORRECT_L, box_size=size, border=4 , ) qr.add_data(data) qr.make(fit=True ) img = qr.make_image(fill_color="black" , back_color="white" ) img.save(output_file) print (f"Generated QR code: {output_file} " ) def generate_qr_batch (data_list, output_dir ): """批量生成 QR 码""" Path(output_dir).mkdir(parents=True , exist_ok=True ) for i, data in enumerate (data_list, 1 ): output_file = f"{output_dir} /qr_{i:03d} .png" generate_qr(data, output_file) if __name__ == "__main__" : generate_qr('https://blog.sharezone.cn' , 'website_qr.png' ) urls = [ 'https://google.com' , 'https://github.com' , 'https://python.org' ] generate_qr_batch(urls, './qrcodes' )
16. Markdown 转 HTML 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 """ Markdown 转 HTML 用途:批量转换 Markdown 文件为 HTML """ import markdownfrom pathlib import Pathdef convert_md_to_html (md_file, html_file=None ): """转换单个文件""" if html_file is None : html_file = md_file.replace('.md' , '.html' ) with open (md_file, 'r' , encoding='utf-8' ) as f: md_content = f.read() html_content = markdown.markdown( md_content, extensions=['extra' , 'codehilite' , 'toc' ] ) html_template = f"""<!DOCTYPE html> <html> <head> <meta charset="UTF-8"> <title>{Path(md_file).stem} </title> <style> body {{ font-family: Arial, sans-serif; max-width: 800px; margin: 0 auto; padding: 20px; }} code {{ background: #f4f4f4; padding: 2px 5px; }} pre {{ background: #f4f4f4; padding: 10px; overflow-x: auto; }} </style> </head> <body> {html_content} </body> </html>""" with open (html_file, 'w' , encoding='utf-8' ) as f: f.write(html_template) print (f"Converted: {md_file} -> {html_file} " ) def batch_convert (input_dir, output_dir=None ): """批量转换""" if output_dir is None : output_dir = input_dir Path(output_dir).mkdir(parents=True , exist_ok=True ) md_files = list (Path(input_dir).glob('*.md' )) for md_file in md_files: html_file = Path(output_dir) / f"{md_file.stem} .html" convert_md_to_html(str (md_file), str (html_file)) if __name__ == "__main__" : convert_md_to_html('article.md' ) batch_convert('./posts' , './html' )
七、总结 本文介绍了 20 个实用的 Python 自动化脚本,涵盖:
✅ 文件处理 :批量重命名、图片转换、PDF 处理、Excel 操作 ✅ 网络请求 :网站监控、API 调用、网页抓取 ✅ 系统运维 :服务器监控、日志分析、自动备份 ✅ 数据处理 :CSV 清洗、JSON 转换、数据可视化 ✅ 实用工具 :密码生成、QR 码、Markdown 转换
所有脚本都经过测试,可直接使用或根据需求修改。
最后更新 : 2026-03-12
标签 : #Python #自动化 #脚本 #效率工具 #运维
分类 : 编程/Python
依赖安装 :
1 pip install pillow pypdf2 pandas openpyxl requests beautifulsoup4 psutil boto3 matplotlib seaborn qrcode markdown
架构师点评:自动化脚本要从“个人效率工具”升级为“可治理工具” Python 自动化脚本很容易从一个临时需求开始:批量改文件、同步数据、调用接口、处理报表。但在企业环境里,临时脚本一旦被反复使用,就会变成事实上的生产工具。如果没有版本管理、参数校验、日志审计、错误处理和权限边界,脚本越多,风险越分散。
脚本治理的关键不是禁止脚本,而是让脚本具备工程化底线。至少要回答:谁能运行、输入是什么、影响范围是什么、失败后如何恢复、执行记录在哪里、是否会处理敏感数据。没有这些约束,自动化会把人的一次误操作放大成批量事故。
企业落地建议:建设内部自动化工具目录 建议将常用 Python 脚本沉淀为内部工具目录:
统一入口 :脚本通过 CLI、Web 控制台或 Agent Tool Registry 暴露,不散落在个人目录。
参数声明 :每个脚本明确参数类型、默认值、必填项、危险参数和示例。
权限控制 :按只读、低风险写入、高风险写入分级授权。
执行审计 :记录操作者、参数、目标资源、输出摘要和失败原因。
测试与回滚 :高风险脚本必须支持 dry-run,并提供回滚或补偿说明。
这样脚本才能从个人经验转化为团队资产,也便于新人复用和平台化管理。
与 AI Agent 工具调用的结合 AI Agent 最适合调用结构化、边界清晰、可审计的自动化脚本。脚本越规范,Agent 越不容易误用。建议把 Python 脚本封装为 Agent 工具时增加以下约束:
工具描述要清晰说明用途和禁止场景;
入参使用 JSON Schema 校验,避免自由文本直接进入命令行;
默认 dry-run,高风险写操作需要人工确认;
输出结构化结果,包含状态、证据、影响范围和建议;
对文件、网络、凭证访问设置最小权限沙箱。
这也是 OpenClaw Skill 系统和企业 AI Coding 平台落地时最重要的工程化基础之一。
AI Agent / AI Coding 实战
想系统学习 AI Agent / Claude Code 落地?
查看 AI Agent、Claude Code、MCP、PR 审查官和企业 AI Coding 相关产品与服务。
合作咨询:chartvip@hotmail.com