diff --git a/api/controllers/inner_tools/answers_summary_analysis.py b/api/controllers/inner_tools/answers_summary_analysis.py index 04d5d54d7c..e510957f32 100644 --- a/api/controllers/inner_tools/answers_summary_analysis.py +++ b/api/controllers/inner_tools/answers_summary_analysis.py @@ -28,8 +28,6 @@ class AnswersSummaryAnalysisApi(Resource): return {"error": "Request must be JSON"}, 400 data = request.get_json() - categories = data.get('categories') - correct_answer = data.get('correct_answer') # a list of correct answer workflow_run_id = data.get('workflow_run_id') # read the arg of this workflow run @@ -45,29 +43,42 @@ class AnswersSummaryAnalysisApi(Resource): try: args_json = json.loads(workflow_run_args) user_answers_file_id = args_json.get('user_answers').get('related_id') + exam_answers_file_id = args_json.get('exam_answers').get('related_id') except json.JSONDecodeError: return {"error": "workflow_run_args must be a valid JSON string"}, 400 if not user_answers_file_id: - return {"error": "file_id is required"}, 400 - if not categories: - return {"error": "categories is required"}, 400 - - # Read the file content with encoding detection - file_content, detected_encoding = self._read_file_with_encoding_detection(user_answers_file_id) - if not file_content: - return {"error": "Failed to read file or file not found"}, 404 - - # Parse the answers - parsed_answers = self._parse_answers(file_content) - if not parsed_answers: - return {"error": "Failed to parse answers from file"}, 400 + return {"error": "user_answers file_id is required"}, 400 + if not exam_answers_file_id: + return {"error": "exam_answers file_id is required"}, 400 + + # Read the exam answers file to get categories and correct answers + exam_answers_file_content, _ = self._read_file_with_encoding_detection(exam_answers_file_id) + if not exam_answers_file_content: + return {"error": "Failed to read exam answers file or file not found"}, 404 + + # Parse the exam answers file + exam_answers, categories, correct_answer = self._parse_exam_answers(exam_answers_file_content) + if not categories or not correct_answer: + return {"error": "Failed to parse categories and correct answers from exam file"}, 400 + + # Read the user answers file content with encoding detection + user_answers_file_content, _ = self._read_file_with_encoding_detection(user_answers_file_id) + if not user_answers_file_content: + return {"error": "Failed to read user answers file or file not found"}, 404 + + # Parse the user answers + user_answers = self._parse_answers(user_answers_file_content) + if not user_answers: + return {"error": "Failed to parse user answers from file"}, 400 # Calculate category statistics - summary_analysis = self._calculate_category_statistics(parsed_answers, correct_answer, categories) + summary_analysis = self._calculate_category_statistics(user_answers, correct_answer, categories) # Return the response - return jsonify({'user_answers': parsed_answers, 'summary_analysis': summary_analysis}) + return jsonify( + {'user_answers': user_answers, 'summary_analysis': summary_analysis, 'exam_answers': exam_answers} + ) def _read_file_with_encoding_detection(self, file_id: str) -> Tuple[Optional[str], Optional[str]]: """Read file content with automatic encoding detection.""" @@ -83,6 +94,8 @@ class AnswersSummaryAnalysisApi(Resource): # Try multiple encodings if needed encodings_to_try = [encoding, 'utf-8', 'gbk', 'gb2312', 'iso-8859-1', 'latin-1'] + # Filter out any None values + encodings_to_try = [enc for enc in encodings_to_try if enc is not None] decoded_content = None detected_encoding = None @@ -99,6 +112,83 @@ class AnswersSummaryAnalysisApi(Resource): print(f"Error reading file: {str(e)}") return None, None + def _parse_exam_answers(self, file_content: str) -> Tuple[List[Dict[str, Any]], List[Dict[str, Any]], List[str]]: + """Parse exam answers from the file content. + + Expected format is CSV with columns: + - 题号 (Question Number) + - 题目分类 (Category) + - 正确答案 (Correct Answer) + - 题目分析 (Question Analysis) + + Returns: + Tuple containing: + - exam_answers: List of dictionaries with question details + - categories: List of categories with question numbers + - correct_answer: List of correct answers + """ + try: + import csv + from collections import defaultdict + from io import StringIO + + # Create a CSV reader from the string content + csv_file = StringIO(file_content) + csv_reader = csv.reader(csv_file) + + # Get the header row + header = next(csv_reader, None) + if not header: + return [], [], [] + + exam_answers = [] + category_map = defaultdict(list) + correct_answers = [""] * 1000 # Initialize with empty strings, we'll trim later + max_question_num = 0 + + for row in csv_reader: + if not row or len(row) < 3: # Skip rows with insufficient data + continue + + try: + question_num = int(row[0].strip()) + category = row[1].strip() + correct_ans = row[2].strip() + analysis = row[3].strip() if len(row) > 3 else "" + + # Record the maximum question number + max_question_num = max(max_question_num, question_num) + + # Add to exam_answers + exam_answers.append( + { + "question_num": question_num, + "category": category, + "correct_answer": correct_ans, + "analysis": analysis, + } + ) + + # Map category to question numbers + category_map[category].append(str(question_num)) + + # Set correct answer + correct_answers[question_num - 1] = correct_ans + except (ValueError, IndexError): + continue + + # Trim correct_answers to the maximum question number + correct_answers = correct_answers[:max_question_num] + + # Convert category_map to the expected categories format + categories = [{"name": cat, "items": items} for cat, items in category_map.items()] + + return exam_answers, categories, correct_answers + except Exception as e: + # Log the exception for debugging + print(f"Error parsing exam answers: {str(e)}") + return [], [], [] + def _parse_answers(self, file_content: str) -> List[Dict[str, Any]]: """Parse answers from the file content.