update analysis

pull/21891/head
ytqh 1 year ago
parent 0c1020b256
commit 475a852a92

@ -28,8 +28,6 @@ class AnswersSummaryAnalysisApi(Resource):
return {"error": "Request must be JSON"}, 400
data = request.get_json()
categories = data.get('categories')
correct_answer = data.get('correct_answer') # a list of correct answer
workflow_run_id = data.get('workflow_run_id')
# read the arg of this workflow run
@ -45,29 +43,42 @@ class AnswersSummaryAnalysisApi(Resource):
try:
args_json = json.loads(workflow_run_args)
user_answers_file_id = args_json.get('user_answers').get('related_id')
exam_answers_file_id = args_json.get('exam_answers').get('related_id')
except json.JSONDecodeError:
return {"error": "workflow_run_args must be a valid JSON string"}, 400
if not user_answers_file_id:
return {"error": "file_id is required"}, 400
if not categories:
return {"error": "categories is required"}, 400
# Read the file content with encoding detection
file_content, detected_encoding = self._read_file_with_encoding_detection(user_answers_file_id)
if not file_content:
return {"error": "Failed to read file or file not found"}, 404
# Parse the answers
parsed_answers = self._parse_answers(file_content)
if not parsed_answers:
return {"error": "Failed to parse answers from file"}, 400
return {"error": "user_answers file_id is required"}, 400
if not exam_answers_file_id:
return {"error": "exam_answers file_id is required"}, 400
# Read the exam answers file to get categories and correct answers
exam_answers_file_content, _ = self._read_file_with_encoding_detection(exam_answers_file_id)
if not exam_answers_file_content:
return {"error": "Failed to read exam answers file or file not found"}, 404
# Parse the exam answers file
exam_answers, categories, correct_answer = self._parse_exam_answers(exam_answers_file_content)
if not categories or not correct_answer:
return {"error": "Failed to parse categories and correct answers from exam file"}, 400
# Read the user answers file content with encoding detection
user_answers_file_content, _ = self._read_file_with_encoding_detection(user_answers_file_id)
if not user_answers_file_content:
return {"error": "Failed to read user answers file or file not found"}, 404
# Parse the user answers
user_answers = self._parse_answers(user_answers_file_content)
if not user_answers:
return {"error": "Failed to parse user answers from file"}, 400
# Calculate category statistics
summary_analysis = self._calculate_category_statistics(parsed_answers, correct_answer, categories)
summary_analysis = self._calculate_category_statistics(user_answers, correct_answer, categories)
# Return the response
return jsonify({'user_answers': parsed_answers, 'summary_analysis': summary_analysis})
return jsonify(
{'user_answers': user_answers, 'summary_analysis': summary_analysis, 'exam_answers': exam_answers}
)
def _read_file_with_encoding_detection(self, file_id: str) -> Tuple[Optional[str], Optional[str]]:
"""Read file content with automatic encoding detection."""
@ -83,6 +94,8 @@ class AnswersSummaryAnalysisApi(Resource):
# Try multiple encodings if needed
encodings_to_try = [encoding, 'utf-8', 'gbk', 'gb2312', 'iso-8859-1', 'latin-1']
# Filter out any None values
encodings_to_try = [enc for enc in encodings_to_try if enc is not None]
decoded_content = None
detected_encoding = None
@ -99,6 +112,83 @@ class AnswersSummaryAnalysisApi(Resource):
print(f"Error reading file: {str(e)}")
return None, None
def _parse_exam_answers(self, file_content: str) -> Tuple[List[Dict[str, Any]], List[Dict[str, Any]], List[str]]:
"""Parse exam answers from the file content.
Expected format is CSV with columns:
- 题号 (Question Number)
- 题目分类 (Category)
- 正确答案 (Correct Answer)
- 题目分析 (Question Analysis)
Returns:
Tuple containing:
- exam_answers: List of dictionaries with question details
- categories: List of categories with question numbers
- correct_answer: List of correct answers
"""
try:
import csv
from collections import defaultdict
from io import StringIO
# Create a CSV reader from the string content
csv_file = StringIO(file_content)
csv_reader = csv.reader(csv_file)
# Get the header row
header = next(csv_reader, None)
if not header:
return [], [], []
exam_answers = []
category_map = defaultdict(list)
correct_answers = [""] * 1000 # Initialize with empty strings, we'll trim later
max_question_num = 0
for row in csv_reader:
if not row or len(row) < 3: # Skip rows with insufficient data
continue
try:
question_num = int(row[0].strip())
category = row[1].strip()
correct_ans = row[2].strip()
analysis = row[3].strip() if len(row) > 3 else ""
# Record the maximum question number
max_question_num = max(max_question_num, question_num)
# Add to exam_answers
exam_answers.append(
{
"question_num": question_num,
"category": category,
"correct_answer": correct_ans,
"analysis": analysis,
}
)
# Map category to question numbers
category_map[category].append(str(question_num))
# Set correct answer
correct_answers[question_num - 1] = correct_ans
except (ValueError, IndexError):
continue
# Trim correct_answers to the maximum question number
correct_answers = correct_answers[:max_question_num]
# Convert category_map to the expected categories format
categories = [{"name": cat, "items": items} for cat, items in category_map.items()]
return exam_answers, categories, correct_answers
except Exception as e:
# Log the exception for debugging
print(f"Error parsing exam answers: {str(e)}")
return [], [], []
def _parse_answers(self, file_content: str) -> List[Dict[str, Any]]:
"""Parse answers from the file content.

Loading…
Cancel
Save