fork download
  1. import csv
  2. import os
  3. import json
  4. import sys
  5. from datetime import datetime
  6.  
  7. def calculate_device_usage(input_file, output_file):
  8. """
  9. 统计智能设备使用时间
  10. :param input_file: 输入CSV文件路径
  11. :param output_file: 输出CSV文件路径
  12. """
  13. # 检查输入文件是否存在
  14. if not os.path.exists(input_file):
  15. raise FileNotFoundError(f"输入文件不存在: {input_file}")
  16.  
  17. # 读取并处理数据
  18. device_data = {}
  19. with open(input_file, 'r', encoding='utf-8') as f:
  20. reader = csv.DictReader(f)
  21. for row_num, row in enumerate(reader, 1):
  22. try:
  23. device_id = row['device_id']
  24. start = datetime.fromisoformat(row['start_time'])
  25. end = datetime.fromisoformat(row['end_time']) if row['end_time'] else datetime.now()
  26.  
  27. if device_id not in device_data:
  28. device_data[device_id] = []
  29. device_data[device_id].append((start, end))
  30. except (KeyError, ValueError) as e:
  31. print(f"警告: 第 {row_num} 行数据格式错误 - {str(e)}")
  32. continue
  33.  
  34. # 如果没有找到有效数据
  35. if not device_data:
  36. print("错误: 输入文件中没有找到有效数据")
  37. return
  38.  
  39. # 计算每个设备的总使用时间(秒)
  40. results = []
  41. for device_id, periods in device_data.items():
  42. # 按开始时间排序
  43. periods.sort()
  44.  
  45. # 合并重叠时间段
  46. merged = []
  47. current_start, current_end = periods[0]
  48. for start, end in periods[1:]:
  49. if start <= current_end:
  50. current_end = max(current_end, end)
  51. else:
  52. merged.append((current_start, current_end))
  53. current_start, current_end = start, end
  54. merged.append((current_start, current_end))
  55.  
  56. # 计算总时间(秒)
  57. total_seconds = sum((end - start).total_seconds() for start, end in merged)
  58.  
  59. # 转换为可读格式
  60. hours, remainder = divmod(total_seconds, 3600)
  61. minutes, seconds = divmod(remainder, 60)
  62. readable = f"{int(hours)}小时{int(minutes)}分{int(seconds)}秒"
  63.  
  64. results.append({
  65. 'device_id': device_id,
  66. 'total_seconds': total_seconds,
  67. 'readable_time': readable
  68. })
  69.  
  70. # 写入结果
  71. with open(output_file, 'w', encoding='utf-8', newline='') as f:
  72. writer = csv.DictWriter(f, fieldnames=['device_id', 'total_seconds', 'readable_time'])
  73. writer.writeheader()
  74. writer.writerows(results)
  75.  
  76. print(f"统计完成!结果已保存至: {output_file}")
  77. print(f"处理了 {len(results)} 台设备的使用记录")
  78.  
  79. # 使用示例
  80. if __name__ == "__main__":
  81. try:
  82. # 替换为实际文件路径
  83. input_file = "device_logs.csv" # 输入文件名
  84. output_file = "usage_report.csv" # 输出文件名
  85.  
  86. # 检查输入文件是否存在
  87. if not os.path.exists(input_file):
  88. # 创建示例文件(仅用于测试)
  89. print(f"创建示例文件: {input_file}")
  90. sample_data = [
  91. ['device_id', 'start_time', 'end_time'],
  92. ['D001', '2023-06-01 08:30:00', '2023-06-01 10:15:00'],
  93. ['D001', '2023-06-01 14:00:00', '2023-06-01 16:45:00'],
  94. ['D002', '2023-06-01 09:00:00', '2023-06-01 11:30:00'],
  95. ['D001', '2023-06-02 13:30:00', '2023-06-02 15:00:00'],
  96. ['D003', '2023-06-02 10:00:00', ''], # 未结束的使用记录
  97. ]
  98.  
  99. with open(input_file, 'w', encoding='utf-8', newline='') as f:
  100. writer = csv.writer(f)
  101. writer.writerows(sample_data)
  102. print("已创建示例数据文件,程序将使用此文件运行")
  103.  
  104. # 执行统计
  105. calculate_device_usage(input_file, output_file)
  106.  
  107. except Exception as e:
  108. print(f"程序运行时出错: {str(e)}")
  109. print("请检查输入文件格式是否正确")
  110. print("文件格式要求:")
  111. print("1. CSV格式,包含表头: device_id,start_time,end_time")
  112. print("2. 时间格式: YYYY-MM-DD HH:MM:SS")
  113. print("3. 空结束时间表示设备仍在运行")
Success #stdin #stdout 0.03s 10200KB
stdin
Standard input is empty
stdout
创建示例文件: device_logs.csv
程序运行时出错: [Errno 13] Permission denied: 'device_logs.csv'
请检查输入文件格式是否正确
文件格式要求:
1. CSV格式,包含表头: device_id,start_time,end_time
2. 时间格式: YYYY-MM-DD HH:MM:SS
3. 空结束时间表示设备仍在运行