fork download
  1. import csv
  2. import os
  3. from datetime import datetime
  4.  
  5. def calculate_device_usage(input_file, output_file):
  6. """
  7. 统计智能设备使用时间
  8. :param input_file: 输入CSV文件路径
  9. :param output_file: 输出CSV文件路径
  10. """
  11. # 检查输入文件是否存在
  12. if not os.path.exists(input_file):
  13. raise FileNotFoundError(f"输入文件不存在: {input_file}")
  14.  
  15. # 读取并处理数据
  16. device_data = {}
  17. permissions={}
  18. with open(input_file, 'r+', encoding='utf-8') as f:
  19. reader = csv.DictReader(f)
  20. for row_num, row in enumerate(reader, 1):
  21. try:
  22. device_id = row['device_id']
  23. start = datetime.fromisoformat(row['start_time'])
  24. end = datetime.fromisoformat(row['end_time']) if row['end_time'] else datetime.now()
  25.  
  26. if device_id not in device_data:
  27. device_data[device_id] = []
  28. device_data[device_id].append((start, end))
  29. except (KeyError, ValueError) as e:
  30. print(f"警告: 第 {row_num} 行数据格式错误 - {str(e)}")
  31. continue
  32.  
  33. # 如果没有找到有效数据
  34. if not device_data:
  35. print("错误: 输入文件中没有找到有效数据")
  36. return
  37.  
  38. # 计算每个设备的总使用时间(秒)
  39. results = []
  40. for device_id, periods in device_data.items():
  41. # 按开始时间排序
  42. periods.sort()
  43.  
  44. # 合并重叠时间段
  45. merged = []
  46. current_start, current_end = periods[0]
  47. for start, end in periods[1:]:
  48. if start <= current_end:
  49. current_end = max(current_end, end)
  50. else:
  51. merged.append((current_start, current_end))
  52. current_start, current_end = start, end
  53. merged.append((current_start, current_end))
  54.  
  55. # 计算总时间(秒)
  56. total_seconds = sum((end - start).total_seconds() for start, end in merged)
  57.  
  58. # 转换为可读格式
  59. hours, remainder = divmod(total_seconds, 3600)
  60. minutes, seconds = divmod(remainder, 60)
  61. readable = f"{int(hours)}小时{int(minutes)}分{int(seconds)}秒"
  62.  
  63. results.append({
  64. 'device_id': device_id,
  65. 'total_seconds': total_seconds,
  66. 'readable_time': readable
  67. })
  68.  
  69. # 写入结果
  70. with open(output_file, 'w', encoding='utf-8', newline='') as f:
  71. writer = csv.DictWriter(f, fieldnames=['device_id', 'total_seconds', 'readable_time'])
  72. writer.writeheader()
  73. writer.writerows(results)
  74.  
  75. print(f"统计完成!结果已保存至: {output_file}")
  76. print(f"处理了 {len(results)} 台设备的使用记录")
  77.  
  78. # 使用示例
  79. if __name__ == "__main__":
  80. try:
  81. # 替换为实际文件路径
  82. input_file = "device_logs.csv" # 输入文件名
  83. output_file = "usage_report.csv" # 输出文件名
  84.  
  85. # 检查输入文件是否存在
  86. if not os.path.exists(input_file):
  87. # 创建示例文件(仅用于测试)
  88. print(f"创建示例文件: {input_file}")
  89. sample_data = [
  90. ['device_id', 'start_time', 'end_time'],
  91. ['D001', '2023-06-01 08:30:00', '2023-06-01 10:15:00'],
  92. ['D001', '2023-06-01 14:00:00', '2023-06-01 16:45:00'],
  93. ['D002', '2023-06-01 09:00:00', '2023-06-01 11:30:00'],
  94. ['D001', '2023-06-02 13:30:00', '2023-06-02 15:00:00'],
  95. ['D003', '2023-06-02 10:00:00', ''], # 未结束的使用记录
  96. ]
  97.  
  98. with open(input_file, 'w', encoding='utf-8', newline='') as f:
  99. writer = csv.writer(f)
  100. writer.writerows(sample_data)
  101. print("已创建示例数据文件,程序将使用此文件运行")
  102.  
  103. # 执行统计
  104. calculate_device_usage(input_file, output_file)
  105.  
  106. except Exception as e:
  107. print(f"程序运行时出错: {str(e)}")
  108. print("请检查输入文件格式是否正确")
  109. print("文件格式要求:")
  110. print("1. CSV格式,包含表头: device_id,start_time,end_time")
  111. print("2. 时间格式: YYYY-MM-DD HH:MM:SS")
  112. print("3. 空结束时间表示设备仍在运行")
Success #stdin #stdout 0.04s 10204KB
stdin
Standard input is empty
stdout
创建示例文件: device_logs.csv
程序运行时出错: [Errno 13] Permission denied: 'device_logs.csv'
请检查输入文件格式是否正确
文件格式要求:
1. CSV格式,包含表头: device_id,start_time,end_time
2. 时间格式: YYYY-MM-DD HH:MM:SS
3. 空结束时间表示设备仍在运行