月度工作量统计脚本

使用python3写了个脚本,用来统计月度工作量。本来只是在本地电脑运行的,想想还是把他移植到web端吧。说真的,要我一个个去搜索拉取,我真不敢说自己就能拉出来,而且一点差错都没有,且不说还要制表汇总,琐碎一地。好在,确实可以通过编写脚本来提高效率,不用一个个去拉取,啥也不说了,上代码。
web演示:https://www.thanx.top/re/index.php
直连地址,生产环境用:
http://106.15.4.153:8085/re/index.php
代码:



import pandas as pd
import argparse

def process_files(file1_path, file2_path, output_filename):
# 读取数据
df_jan = pd.read_excel(file1_path, sheet_name='导出数据')
df_feb = pd.read_excel(file2_path, sheet_name='导出数据')

# 定义项目名称映射规则
project_mapping = {
'无痛胃镜': '无胃',
'无痛肠镜': '无肠',
'EMR/APC': 'EMR',
'止血术': '止血',
'扩张术': '扩张',
'超声内镜': '超声',
'异物摄取': '异物',
'病例数': '总数'
}

# 定义项目顺序(使用映射后的名称)
project_order = [
'胃镜', '无胃', '肠镜', '无肠', '超声', 'EMR', 'ESD', 'ERCP',
'止血', '异物', '扩张', '其他'
]

# 定义统计函数
def count_stats(df):
stats = {
'胃镜': 0,
'无痛胃镜': 0,
'肠镜': 0,
'无痛肠镜': 0,
'超声内镜': 0,
'EMR/APC': 0,
'ESD': 0,
'ERCP': 0,
'止血术': 0,
'异物摄取': 0,
'扩张术': 0,
'其他': 0
}
for _, row in df.iterrows():
category = str(row['检查类别']).lower().strip()
diagnosis = str(row['镜下诊断']).lower().strip()

# 统计检查类别
if '十二指肠镜' in category or 'ercp' in category:
stats['ERCP'] += 1
elif '胃镜' in category and '无痛' not in category:
stats['胃镜'] += 1
elif '无痛胃镜' in category:
stats['无痛胃镜'] += 1
elif '肠镜' in category and '无痛' not in category:
stats['肠镜'] += 1
elif '无痛肠镜' in category:
stats['无痛肠镜'] += 1
elif '超声内镜' in category:
stats['超声内镜'] += 1
else:
stats['其他'] += 1

# 统计镜下诊断
if '扩张' in diagnosis:
stats['扩张术'] += 1
if 'esd' in diagnosis and 'esd术后' not in diagnosis:
stats['ESD'] += 1
if 'emr' in diagnosis or 'apc' in diagnosis:
stats['EMR/APC'] += 1
if '止血' in diagnosis:
stats['止血术'] += 1
if '异物' in diagnosis:
stats['异物摄取'] += 1

# 计算病例数
stats['病例数'] = (
stats['胃镜'] +
stats['无痛胃镜'] +
stats['肠镜'] +
stats['无痛肠镜'] +
stats['超声内镜'] +
stats['ERCP'] +
stats['其他']
)
return stats

# 获取上月和本月的统计数据
stats_jan = count_stats(df_jan)
stats_feb = count_stats(df_feb)

# 计算同比变化
def calculate_change(current, previous):
if previous == 0:
return 0
return round((current - previous) / previous * 100, 2)

# 创建内镜中心工作量统计 DataFrame
center_data = []
for project in project_order:
original_project = next(
(key for key, value in project_mapping.items() if value == project),
project
)
center_data.append({
'项目': project,
'本月数量': stats_feb.get(original_project, 0),
'上月数量': stats_jan.get(original_project, 0),
'同比变化(%)': calculate_change(stats_feb.get(original_project, 0), stats_jan.get(original_project, 0))
})

center_df = pd.DataFrame(center_data)

# 增加汇总行
summary_row = pd.DataFrame({
'项目': ['汇总'],
'本月数量': [
stats_feb['胃镜'] + stats_feb['无痛胃镜'] + stats_feb['肠镜'] +
stats_feb['无痛肠镜'] + stats_feb['超声内镜'] + stats_feb['ERCP'] + stats_feb['其他']
],
'上月数量': [
stats_jan['胃镜'] + stats_jan['无痛胃镜'] + stats_jan['肠镜'] +
stats_jan['无痛肠镜'] + stats_jan['超声内镜'] + stats_jan['ERCP'] + stats_jan['其他']
],
'同比变化(%)': [calculate_change(
stats_feb['胃镜'] + stats_feb['无痛胃镜'] + stats_feb['肠镜'] +
stats_feb['无痛肠镜'] + stats_feb['超声内镜'] + stats_feb['ERCP'] + stats_feb['其他'],
stats_jan['胃镜'] + stats_jan['无痛胃镜'] + stats_jan['肠镜'] +
stats_jan['无痛肠镜'] + stats_jan['超声内镜'] + stats_jan['ERCP'] + stats_jan['其他']
)],
'备注': ['']
})
center_df = pd.concat([center_df, summary_row], ignore_index=True)

# 统计医生工作量
def count_doctor_stats(df):
doctor_stats = {}
for _, row in df.iterrows():
doctor = row['报告医师']
category = str(row['检查类别']).lower().strip()
diagnosis = str(row['镜下诊断']).lower().strip()

if doctor not in doctor_stats:
doctor_stats[doctor] = {
'胃镜': 0,
'无痛胃镜': 0,
'肠镜': 0,
'无痛肠镜': 0,
'超声内镜': 0,
'ERCP': 0,
'EMR/APC': 0,
'ESD': 0,
'止血术': 0,
'扩张术': 0,
'异物摄取': 0,
'其他': 0,
'病例数': 0
}

if '十二指肠镜' in category or 'ercp' in category:
doctor_stats[doctor]['ERCP'] += 1
elif '胃镜' in category and '无痛' not in category:
doctor_stats[doctor]['胃镜'] += 1
elif '无痛胃镜' in category:
doctor_stats[doctor]['无痛胃镜'] += 1
elif '肠镜' in category and '无痛' not in category:
doctor_stats[doctor]['肠镜'] += 1
elif '无痛肠镜' in category:
doctor_stats[doctor]['无痛肠镜'] += 1
elif '超声内镜' in category:
doctor_stats[doctor]['超声内镜'] += 1
else:
doctor_stats[doctor]['其他'] += 1

if '扩张' in diagnosis:
doctor_stats[doctor]['扩张术'] += 1
if 'esd' in diagnosis and 'esd术后' not in diagnosis:
doctor_stats[doctor]['ESD'] += 1
if 'emr' in diagnosis or 'apc' in diagnosis:
doctor_stats[doctor]['EMR/APC'] += 1
if '止血' in diagnosis:
doctor_stats[doctor]['止血术'] += 1
if '异物' in diagnosis:
doctor_stats[doctor]['异物摄取'] += 1

# 计算病例数
doctor_stats[doctor]['病例数'] = (
doctor_stats[doctor]['胃镜'] +
doctor_stats[doctor]['无痛胃镜'] +
doctor_stats[doctor]['肠镜'] +
doctor_stats[doctor]['无痛肠镜'] +
doctor_stats[doctor]['超声内镜'] +
doctor_stats[doctor]['其他'] +
doctor_stats[doctor]['ERCP']
)
return doctor_stats

# 获取上月和本月的医生统计数据
doctor_stats_jan = count_doctor_stats(df_jan)
doctor_stats_feb = count_doctor_stats(df_feb)

# 创建医生工作量统计 DataFrame
doctor_data = []
for doctor, stats in doctor_stats_feb.items():
doctor_data.append({
'医师': doctor,
**{project_mapping.get(k, k): v for k, v in stats.items()}
})

doctor_df = pd.DataFrame(doctor_data)

# 增加汇总行
summary_row = pd.DataFrame({
'医师': ['汇总'],
**{project_mapping.get(k, k): [doctor_df[project_mapping.get(k, k)].sum()] for k in project_order},
'总数': [doctor_df['总数'].sum()]
})
doctor_df = pd.concat([doctor_df, summary_row], ignore_index=True)

# 统计护士工作量
def count_nurse_stats(df):
nurse_stats = {}
for _, row in df.iterrows():
nurse = row['助手']
category = str(row['检查类别']).lower().strip()
diagnosis = str(row['镜下诊断']).lower().strip()

if nurse not in nurse_stats:
nurse_stats[nurse] = {
'胃镜': 0,
'无痛胃镜': 0,
'肠镜': 0,
'无痛肠镜': 0,
'超声内镜': 0,
'ERCP': 0,
'EMR/APC': 0,
'ESD': 0,
'止血术': 0,
'扩张术': 0,
'异物摄取': 0,
'其他': 0,
'病例数': 0
}

if '十二指肠镜' in category or 'ercp' in category:
nurse_stats[nurse]['ERCP'] += 1
elif '胃镜' in category and '无痛' not in category:
nurse_stats[nurse]['胃镜'] += 1
elif '无痛胃镜' in category:
nurse_stats[nurse]['无痛胃镜'] += 1
elif '肠镜' in category and '无痛' not in category:
nurse_stats[nurse]['肠镜'] += 1
elif '无痛肠镜' in category:
nurse_stats[nurse]['无痛肠镜'] += 1
elif '超声内镜' in category:
nurse_stats[nurse]['超声内镜'] += 1
else:
nurse_stats[nurse]['其他'] += 1

if '扩张' in diagnosis:
nurse_stats[nurse]['扩张术'] += 1
if 'esd' in diagnosis and 'esd术后' not in diagnosis:
nurse_stats[nurse]['ESD'] += 1
if 'emr' in diagnosis or 'apc' in diagnosis:
nurse_stats[nurse]['EMR/APC'] += 1
if '止血' in diagnosis:
nurse_stats[nurse]['止血术'] += 1
if '异物' in diagnosis:
nurse_stats[nurse]['异物摄取'] += 1

# 计算病例数
nurse_stats[nurse]['病例数'] = (
nurse_stats[nurse]['胃镜'] +
nurse_stats[nurse]['无痛胃镜'] +
nurse_stats[nurse]['肠镜'] +
nurse_stats[nurse]['无痛肠镜'] +
nurse_stats[nurse]['超声内镜'] +
nurse_stats[nurse]['其他'] +
nurse_stats[nurse]['ERCP']
)
return nurse_stats

# 获取上月和本月的护士统计数据
nurse_stats_jan = count_nurse_stats(df_jan)
nurse_stats_feb = count_nurse_stats(df_feb)

# 创建护士工作量统计 DataFrame
nurse_data = []
for nurse, stats in nurse_stats_feb.items():
nurse_data.append({
'护士': nurse,
**{project_mapping.get(k, k): v for k, v in stats.items()}
})

nurse_df = pd.DataFrame(nurse_data)

# 增加汇总行
summary_row = pd.DataFrame({
'护士': ['汇总'],
**{project_mapping.get(k, k): [nurse_df[project_mapping.get(k, k)].sum()] for k in project_order},
'总数': [nurse_df['总数'].sum()]
})
nurse_df = pd.concat([nurse_df, summary_row], ignore_index=True)

# 保存更新后的Excel文件
with pd.ExcelWriter(output_filename) as writer:
center_df.to_excel(writer, sheet_name='内镜中心工作量统计', index=False)
doctor_df.to_excel(writer, sheet_name='医生工作量统计', index=False)
nurse_df.to_excel(writer, sheet_name='护士工作量统计', index=False)

if __name__ == '__main__':
parser = argparse.ArgumentParser(description='Process Excel files.')
parser.add_argument('file1', help='Path to the first Excel file')
parser.add_argument('file2', help='Path to the second Excel file')
parser.add_argument('output', help='Path to save the output Excel file')

args = parser.parse_args()

process_files(args.file1, args.file2, args.output)
This entry was posted in Resourse. Bookmark the permalink.

Leave a Reply

Your email address will not be published. Required fields are marked *