diff --git a/attendance_app/__init__.py b/attendance_app/__init__.py new file mode 100644 index 0000000..487492b --- /dev/null +++ b/attendance_app/__init__.py @@ -0,0 +1 @@ +"""Attendance tracking application modules.""" diff --git a/attendance_app/database.py b/attendance_app/database.py new file mode 100644 index 0000000..5d3ccde --- /dev/null +++ b/attendance_app/database.py @@ -0,0 +1,88 @@ +"""Database access layer for user information.""" + +import sqlite3 +from typing import Optional, Tuple + + +class UserDatabase: + """Interface for user database access.""" + + def lookup_user(self, user_id: str) -> Optional[Tuple]: + """ + Look up user information by ID. + + Args: + user_id: User login ID + + Returns: + Tuple of (number, name_kanji, name_kana, depart_name, pos_code) + or None if not found + """ + raise NotImplementedError + + +class SQLiteUserDatabase(UserDatabase): + """SQLite implementation of user database.""" + + def __init__(self, db_path: str): + """ + Initialize database connection. + + Args: + db_path: Path to SQLite database file + """ + self.db_path = db_path + self.conn = None + self.cursor = None + + def __enter__(self): + """Context manager entry.""" + self.conn = sqlite3.connect(self.db_path) + self.cursor = self.conn.cursor() + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + """Context manager exit.""" + if self.cursor: + self.cursor.close() + if self.conn: + self.conn.close() + + def lookup_user(self, user_id: str) -> Optional[Tuple]: + """ + Look up user information by ID. + + Args: + user_id: User login ID + + Returns: + Tuple of (number, name_kanji, name_kana, depart_name, pos_code) + or None if not found + """ + if not self.cursor: + raise RuntimeError("Database not opened. Use as context manager.") + + query = """ + SELECT number, name_kanji, name_kana, depart_name, pos_code + FROM user + WHERE id = ? + """ + result = self.cursor.execute(query, (user_id,)) + return result.fetchone() + + +class MockUserDatabase(UserDatabase): + """Mock implementation for testing.""" + + def __init__(self, user_data: dict): + """ + Initialize with mock user data. + + Args: + user_data: Dict mapping user_id to tuple of user info + """ + self.user_data = user_data + + def lookup_user(self, user_id: str) -> Optional[Tuple]: + """Look up user from mock data.""" + return self.user_data.get(user_id) diff --git a/attendance_app/excel_output.py b/attendance_app/excel_output.py new file mode 100644 index 0000000..4a0cf24 --- /dev/null +++ b/attendance_app/excel_output.py @@ -0,0 +1,232 @@ +"""Excel output generation for attendance reports.""" + +from datetime import datetime +import datetime as dt +import tempfile +import os +from typing import List, Dict + +try: + from openpyxl import load_workbook + from openpyxl.worksheet.table import Table, TableStyleInfo + from openpyxl.styles import Alignment + from openpyxl.styles.fonts import Font as xlfont + OPENPYXL_AVAILABLE = True +except ImportError: + OPENPYXL_AVAILABLE = False + + +class ExcelGenerator: + """Generate Excel attendance reports.""" + + IML_ROOM_A = [ + "01", "02", "03", "04", "05", "06", + "11", "12", "13", "14", "15", "16", + "21", "22", "23", "24", "25", "26", + "31", "32", "33", "34", "35", "36", + "41", "42", "43", "44", + "51", "52", "53", "54", + "K01", "K03", "61" + ] + + def __init__(self, template_path: str): + """ + Initialize Excel generator. + + Args: + template_path: Path to Excel template file + """ + if not OPENPYXL_AVAILABLE: + raise ImportError("openpyxl is required for Excel generation") + + self.template_path = template_path + + def generate(self, records: List[List], from_time: datetime, + to_time: datetime, room_filters: Dict[str, bool]) -> str: + """ + Generate Excel file from attendance records. + + Args: + records: List of attendance records + from_time: Start time for report + to_time: End time for report + room_filters: Dict of enabled room filters + + Returns: + Path to generated temporary Excel file + """ + wb = load_workbook(self.template_path) + + # Sort records by user_id + login time + records.sort(key=lambda x: x[4] + x[1]) + + # Process each room + for room in ["GRL", "ML1", "ML2", "IML"]: + # Remove existing sheet + sheet_name = room + "出席表" + if sheet_name in wb.sheetnames: + wb.remove(wb[sheet_name]) + + if room in room_filters and room_filters[room]: + self._create_room_sheet(wb, room, records, from_time, to_time, room_filters) + else: + # Also remove seating chart if room not selected + seating_name = room + "座席表" + if seating_name in wb.sheetnames: + wb.remove(wb[seating_name]) + + # Save to temporary file + filename = "attendancebook" + from_time.strftime("%Y%m%d%H%M") + ".xlsx" + tname = tempfile.mktemp(suffix=filename) + wb.save(tname) + + return tname + + def _create_room_sheet(self, wb, room: str, records: List[List], + from_time: datetime, to_time: datetime, + room_filters: Dict[str, bool]): + """Create attendance sheet for a specific room.""" + ws = wb.create_sheet(title=room + "出席表") + ws.sheet_properties.pageSetUpPr.fitToPage = True + + # Determine room prefix + if room == "IML": + prefix = "IML" + table_prefix = "HSJ" + else: + prefix = room + table_prefix = room + + # Filter records for this room + extract = [] + for record in records: + host, logon, logoff, _, _, student_no, name, reading, dep, pos, status = record + + if host is None or not host.startswith(prefix): + continue + + # Handle IML sub-rooms + if room == "IML": + machine_num = host[4:] + in_room_a = machine_num in self.IML_ROOM_A + + if in_room_a and not room_filters.get("IML-A", False): + continue + if not in_room_a and not room_filters.get("IML-B", False): + continue + + extract.append([ + host, str(student_no), logon[:5], logoff[-5:], + name, reading, dep, "", pos + ]) + + # Set column widths + ws.column_dimensions["A"].width = 8.5 + ws.column_dimensions["B"].width = 10.0 + ws.column_dimensions["C"].width = 7.0 + ws.column_dimensions["D"].width = 7.0 + ws.column_dimensions["E"].width = 16.0 + ws.column_dimensions["F"].width = 18.0 + ws.column_dimensions["G"].width = 17.8 + ws.column_dimensions["H"].width = 17.0 + + # Add header rows + ws.append([ + "学生", "", "", "", + from_time.strftime("%Y/%m/%d"), + from_time.strftime("%H:%M") + "〜" + to_time.strftime("%H:%M") + ]) + ws.append(["ホスト名", "学籍番号", "開始", "終了", "名前", "よみ", "所属", "備考"]) + + # Process student records + stafflist = [] + prev = None + align = Alignment(wrap_text=True, vertical="center", horizontal="center") + + for e in extract: + try: + login = dt.time.fromisoformat(e[2]) + logout = dt.time.fromisoformat(e[3]) + font = None + + # Check for missing logoff + if dt.time(18, 45) <= login and logout >= dt.time(19, 50): + e[3] = "記録無" + e[7] = "ログオフ記録無" + font = xlfont(color="b00000") + except (ValueError, TypeError): + # Handle invalid time formats + font = None + + # Separate students from staff + if e[8] == "01" and not e[1].startswith("M"): + # Check if same user as previous (multiple logins) + if prev == e[1]: + row = str(ws.max_row) + ws['C' + row] = ws['C' + row].value + "\n" + str(e[2]) + ws['D' + row] = ws['D' + row].value + "\n" + e[3] + else: + ws.append(e[:-1]) + row = str(ws.max_row) + # Apply alignment to all cells + for col in ['A', 'B', 'C', 'D', 'E', 'F', 'G', 'H']: + ws[col + row].alignment = align + if font: + ws[col + row].font = font + else: + stafflist.append(e[:-1]) + + prev = e[1] + + # Create student table + if ws.max_row > 2: + tab = Table( + displayName="StudentTable" + table_prefix, + ref="A2:H" + str(ws.max_row) + ) + style = TableStyleInfo( + name="TableStyleLight1", + showFirstColumn=False, + showLastColumn=False, + showRowStripes=True, + showColumnStripes=False + ) + tab.tableStyleInfo = style + ws.add_table(tab) + + # Add student count + ws.append([]) + ws.append(['合計', '=COUNTA(StudentTable' + table_prefix + '[学籍番号])', '人']) + + # Add staff section + ws.append([]) + ws.append(["スタッフ"]) + ws.append(["ホスト名", "ユーザ名", "開始", "終了", "名前", "よみ", "所属", "ステータス"]) + staff_start = ws.max_row + + for e in stafflist: + ws.append(e) + row = str(ws.max_row) + for col in ['A', 'B', 'C', 'D', 'E', 'F', 'G']: + ws[col + row].alignment = align + + if ws.max_row == staff_start: + ws.append(["-", "-", "-", "-", "-", "-", "-"]) + + # Create staff table + tab = Table( + displayName="StaffTable" + table_prefix, + ref="A" + str(staff_start) + ":G" + str(ws.max_row) + ) + style = TableStyleInfo( + name="TableStyleLight2", + showFirstColumn=False, + showLastColumn=False, + showRowStripes=True, + showColumnStripes=False + ) + tab.tableStyleInfo = style + ws.add_table(tab) + + # Hide status column + ws.column_dimensions['H'].hidden = True diff --git a/attendance_app/log_parser.py b/attendance_app/log_parser.py new file mode 100644 index 0000000..e45fee7 --- /dev/null +++ b/attendance_app/log_parser.py @@ -0,0 +1,98 @@ +"""Log parsing functions for attendance tracking.""" + +import os +import re +from datetime import datetime, timedelta +from typing import List, Tuple + +from .utils import machine_name + + +def get_log_files(logdir: str, start_date: datetime) -> List[str]: + """ + Get list of relevant log files for a given date range. + + Args: + logdir: Directory containing log files + start_date: Start date for log search + + Returns: + List of log file paths to process + """ + tail = start_date + timedelta(7) # a week after + lower = "access_log-%04d%02d%02d" % (start_date.year, start_date.month, start_date.day) + higher = "access_log-%04d%02d%02d" % (tail.year, tail.month, tail.day) + + logfiles = [ + fname for fname in os.listdir(logdir) + if fname.startswith("access_log-") and lower <= fname <= higher + ] + + if not logfiles: + logfiles.append('access_log') + + return logfiles + + +def parse_log_line(line: str) -> Tuple[str, datetime, str] | None: + """ + Parse a single log line for logon/logoff events. + + Args: + line: Raw log line + + Returns: + Tuple of (machine_name, datetime, action) or None if line doesn't match + """ + if "logonoff" not in line: + return None + + match = re.match( + r'^([0-9.]*).*\[(\d+/\w+/\d+:\d+:\d+:\d+).*logonoff\?(.*) HTTP', + line + ) + + if not match: + return None + + ip, date_str, action = match.groups() + + try: + date = datetime.strptime(date_str, '%d/%b/%Y:%H:%M:%S') + host = machine_name(ip) + return (host, date, action) + except ValueError: + return None + + +def parse_log_files(logdir: str, logfiles: List[str]) -> dict: + """ + Parse log files and group events by machine. + + Args: + logdir: Directory containing log files + logfiles: List of log file names to parse + + Returns: + Dictionary mapping machine names to list of [datetime, action] pairs + """ + db = {} + + for fname in logfiles[-2:]: # Only process last 2 files + filepath = os.path.join(logdir, fname) + + try: + with open(filepath) as f: + for line in f: + result = parse_log_line(line) + if result: + host, date, action = result + if host in db: + db[host].append([date, action]) + else: + db[host] = [[date, action]] + except (IOError, OSError): + # Skip files that can't be opened + continue + + return db diff --git a/attendance_app/processor.py b/attendance_app/processor.py new file mode 100644 index 0000000..4d95465 --- /dev/null +++ b/attendance_app/processor.py @@ -0,0 +1,220 @@ +"""Attendance data processing and state machine logic.""" + +from datetime import datetime, timedelta +import datetime as dt +from typing import List, Dict, Callable, Optional + + +class AttendanceRecord: + """Represents a single attendance record.""" + + def __init__(self, machine: str, login_time: str, logout_time: str, + action: str, user_id: str, student_no: str, name: str, + reading: str, department: str, pos_code: str, status: str): + self.machine = machine + self.login_time = login_time + self.logout_time = logout_time + self.action = action + self.user_id = user_id + self.student_no = student_no + self.name = name + self.reading = reading + self.department = department + self.pos_code = pos_code + self.status = status + + def to_list(self) -> List: + """Convert record to list format for JSON output.""" + return [ + self.machine, self.login_time, self.logout_time, self.action, + self.user_id, self.student_no, self.name, self.reading, + self.department, self.pos_code, self.status + ] + + +class AttendanceProcessor: + """Process log events to generate attendance records.""" + + def __init__(self, user_lookup_func: Callable[[str], Optional[tuple]], + datefmt: str = "%H:%M"): + """ + Initialize processor with user lookup function. + + Args: + user_lookup_func: Function that takes user_id and returns + (number, name_kanji, name_kana, depart_name, pos_code) + or None if not found + datefmt: Date format string for output times + """ + self.user_lookup = user_lookup_func + self.datefmt = datefmt + + def process_events(self, db: Dict[str, List], from_time: datetime, + to_time: datetime) -> List[List]: + """ + Process machine events and generate attendance records. + + Args: + db: Dictionary mapping machine names to [datetime, action] pairs + from_time: Start of time range to check + to_time: End of time range to check + + Returns: + List of attendance records as lists + """ + lines = [] + + for machine in list(db.keys()): + candidate = [] + active = False + reject = False + + for date, action in db[machine]: + # Handle login events + if action.startswith("on:"): + # Remove previous incomplete entry if exists + if candidate and candidate[-1][2] == "---": + candidate.pop() + + action_type, id_ = action.split(":", 1) + id_ = id_.lower() + + if date < to_time: + date_str = date.strftime(self.datefmt) + active = True + user_data = self.user_lookup(id_) + + if user_data: + candidate.append([ + machine, date_str, "---", "on", id_, + user_data[0], user_data[1], user_data[2], + user_data[3], user_data[4], "" + ]) + else: + candidate.append([ + machine, date_str, "---", "on", id_, + "????", id_, "--", "--", "--", "" + ]) + + # Handle rejection (double-login) + elif action.startswith("reject") and candidate: + parts = action.split(":", 2) + if len(parts) >= 2: + _, id_, *_ = parts + if id_.lower() == candidate[-1][4]: + reject = True + candidate.pop() + + # Handle startup/shutdown + elif action.startswith("startup") and candidate: + action = "off:" + candidate[-1][4] + elif action.startswith("shutdown") and candidate: + action = "off:" + candidate[-1][4] + + # Handle logout events + if action.startswith("off:"): + action_type, id_ = action.split(":", 1) + + if reject: + reject = False + elif candidate: + # If logout is before start time, remove candidate + if not from_time < date: + candidate.pop() + # If logout matches active login + elif active and id_ == candidate[-1][4]: + active = False + candidate[-1][2] = date.strftime(self.datefmt) + + # Handle incomplete login time + if candidate[-1][1].startswith("?"): + candidate[-1][1] = "?〜" + date.strftime(self.datefmt) + else: + # Check for missing logoff record between specific hours + if (date.time() > dt.time(19, 55) and + date.time() > dt.time(18, 45)): + candidate[-1][1] += "〜記録無" + candidate[-1][10] = "ログオフ記録無" + else: + candidate[-1][1] += "〜" + date.strftime(self.datefmt) + + # Logout with no matching login (orphaned logout) + elif from_time < date < to_time: + to_entry = date.strftime(self.datefmt) + entry = date.strftime(self.datefmt) + user_data = self.user_lookup(id_) + + if user_data: + candidate.append([ + machine, "?〜" + entry, to_entry, "off", id_, + user_data[0], user_data[1], user_data[2], + user_data[3], user_data[4], "" + ]) + else: + candidate.append([ + machine, "?〜" + entry, "---", "off", id_, + "????", id_, "--", "--", "--", "" + ]) + + lines.extend(candidate) + + return lines + + def filter_by_rooms(self, records: List[List], room_filters: Dict[str, bool], + unique_users: bool = False) -> List[List]: + """ + Filter records by room selection. + + Args: + records: List of attendance records + room_filters: Dict of room names to boolean enabled status + unique_users: If True, remove duplicate entries for same user + + Returns: + Filtered list of records + """ + filtered = [] + prev_user = None + + IML_ROOM_A = [ + "01", "02", "03", "04", "05", "06", + "11", "12", "13", "14", "15", "16", + "21", "22", "23", "24", "25", "26", + "31", "32", "33", "34", "35", "36", + "41", "42", "43", "44", + "51", "52", "53", "54", + "K01", "K03", "61" + ] + + for record in records: + host = record[0] + user_id = record[4] + + if host is None: + continue + + accept = False + + # Check room filters + if host.startswith("GRL") and room_filters.get("GRL", False): + accept = True + elif host.startswith("ML1") and room_filters.get("ML1", False): + accept = True + elif host.startswith("ML2") and room_filters.get("ML2", False): + accept = True + elif host.startswith("IML"): + machine_num = host[4:] + in_room_a = machine_num in IML_ROOM_A + + if room_filters.get("IML-A", False) and in_room_a: + accept = True + if room_filters.get("IML-B", False) and not in_room_a: + accept = True + + # Apply unique filter + if accept: + if not unique_users or prev_user != user_id: + filtered.append(record) + prev_user = user_id + + return filtered diff --git a/attendance_app/tests/__init__.py b/attendance_app/tests/__init__.py new file mode 100644 index 0000000..b0635fb --- /dev/null +++ b/attendance_app/tests/__init__.py @@ -0,0 +1 @@ +"""Unit tests for attendance application.""" diff --git a/attendance_app/tests/test_database.py b/attendance_app/tests/test_database.py new file mode 100644 index 0000000..8f0d981 --- /dev/null +++ b/attendance_app/tests/test_database.py @@ -0,0 +1,127 @@ +"""Unit tests for database access layer.""" + +import unittest +import sqlite3 +import tempfile +import os +from attendance_app.database import SQLiteUserDatabase, MockUserDatabase + + +class TestMockUserDatabase(unittest.TestCase): + """Test MockUserDatabase class.""" + + def test_lookup_existing_user(self): + """Test looking up an existing user.""" + user_data = { + 'user123': ('S001', 'Taro Yamada', 'ヤマダ タロウ', 'CS', '01') + } + db = MockUserDatabase(user_data) + result = db.lookup_user('user123') + + self.assertIsNotNone(result) + self.assertEqual(result[0], 'S001') + self.assertEqual(result[1], 'Taro Yamada') + + def test_lookup_nonexistent_user(self): + """Test looking up a non-existent user.""" + user_data = {'user123': ('S001', 'Test', 'テスト', 'CS', '01')} + db = MockUserDatabase(user_data) + result = db.lookup_user('unknown') + + self.assertIsNone(result) + + +class TestSQLiteUserDatabase(unittest.TestCase): + """Test SQLiteUserDatabase class.""" + + def setUp(self): + """Create temporary test database.""" + self.temp_db = tempfile.NamedTemporaryFile(delete=False, suffix='.sqlite3') + self.db_path = self.temp_db.name + self.temp_db.close() + + # Create test database + conn = sqlite3.connect(self.db_path) + cursor = conn.cursor() + + cursor.execute(''' + CREATE TABLE user ( + id TEXT PRIMARY KEY, + number TEXT, + name_kanji TEXT, + name_kana TEXT, + depart_name TEXT, + pos_code TEXT + ) + ''') + + cursor.execute(''' + INSERT INTO user VALUES + ('user123', 'S001', 'Taro Yamada', 'ヤマダ タロウ', 'CS', '01'), + ('user456', 'S002', 'Hanako Tanaka', 'タナカ ハナコ', 'AM', '01'), + ('staff1', 'M001', 'Ken Sato', 'サトウ ケン', 'Staff', '02') + ''') + + conn.commit() + conn.close() + + def tearDown(self): + """Remove temporary database.""" + try: + os.unlink(self.db_path) + except: + pass + + def test_lookup_existing_user(self): + """Test looking up an existing user.""" + with SQLiteUserDatabase(self.db_path) as db: + result = db.lookup_user('user123') + + self.assertIsNotNone(result) + self.assertEqual(result[0], 'S001') + self.assertEqual(result[1], 'Taro Yamada') + self.assertEqual(result[2], 'ヤマダ タロウ') + self.assertEqual(result[3], 'CS') + self.assertEqual(result[4], '01') + + def test_lookup_multiple_users(self): + """Test looking up multiple users.""" + with SQLiteUserDatabase(self.db_path) as db: + result1 = db.lookup_user('user123') + result2 = db.lookup_user('user456') + result3 = db.lookup_user('staff1') + + self.assertEqual(result1[0], 'S001') + self.assertEqual(result2[0], 'S002') + self.assertEqual(result3[0], 'M001') + + def test_lookup_nonexistent_user(self): + """Test looking up a non-existent user.""" + with SQLiteUserDatabase(self.db_path) as db: + result = db.lookup_user('unknown') + + self.assertIsNone(result) + + def test_context_manager(self): + """Test that database is properly closed after context.""" + db = SQLiteUserDatabase(self.db_path) + + with db: + self.assertIsNotNone(db.conn) + self.assertIsNotNone(db.cursor) + + # After exiting context, connection should be closed + # Attempting to use it should fail + with self.assertRaises(Exception): + db.cursor.execute("SELECT 1") + + def test_lookup_without_context_manager(self): + """Test that lookup fails without context manager.""" + db = SQLiteUserDatabase(self.db_path) + + with self.assertRaises(RuntimeError): + db.lookup_user('user123') + + +if __name__ == '__main__': + unittest.main() diff --git a/attendance_app/tests/test_log_parser.py b/attendance_app/tests/test_log_parser.py new file mode 100644 index 0000000..aefcfa6 --- /dev/null +++ b/attendance_app/tests/test_log_parser.py @@ -0,0 +1,125 @@ +"""Unit tests for log parser.""" + +import unittest +from datetime import datetime +from attendance_app.log_parser import parse_log_line, get_log_files +import tempfile +import os + + +class TestParseLogLine(unittest.TestCase): + """Test parse_log_line function.""" + + def test_valid_logon_entry(self): + """Test parsing a valid logon entry.""" + line = '172.29.11.116.151 - - [27/Nov/2024:14:30:15 +0900] "GET /logonoff?on:user123 HTTP/1.1" 200 -' + result = parse_log_line(line) + + self.assertIsNotNone(result) + host, date, action = result + self.assertEqual(host, "GRL-01") + self.assertEqual(date, datetime(2024, 11, 27, 14, 30, 15)) + self.assertEqual(action, "on:user123") + + def test_valid_logoff_entry(self): + """Test parsing a valid logoff entry.""" + line = '172.29.11.117.152 - - [27/Nov/2024:17:00:30 +0900] "GET /logonoff?off:user456 HTTP/1.1" 200 -' + result = parse_log_line(line) + + self.assertIsNotNone(result) + host, date, action = result + self.assertEqual(host, "ML1-02") + self.assertEqual(action, "off:user456") + + def test_reject_entry(self): + """Test parsing a reject entry.""" + line = '172.29.11.118.151 - - [27/Nov/2024:15:00:00 +0900] "GET /logonoff?reject:user789:ML2-01 HTTP/1.1" 200 -' + result = parse_log_line(line) + + self.assertIsNotNone(result) + host, date, action = result + self.assertEqual(action, "reject:user789:ML2-01") + + def test_startup_shutdown(self): + """Test parsing startup/shutdown entries.""" + line1 = '172.29.11.119.151 - - [27/Nov/2024:09:00:00 +0900] "GET /logonoff?startup HTTP/1.1" 200 -' + result1 = parse_log_line(line1) + self.assertIsNotNone(result1) + self.assertEqual(result1[2], "startup") + + line2 = '172.29.11.119.151 - - [27/Nov/2024:19:00:00 +0900] "GET /logonoff?shutdown HTTP/1.1" 200 -' + result2 = parse_log_line(line2) + self.assertIsNotNone(result2) + self.assertEqual(result2[2], "shutdown") + + def test_non_logonoff_entry(self): + """Test that non-logonoff entries return None.""" + line = '172.29.11.116.151 - - [27/Nov/2024:14:30:15 +0900] "GET /index.html HTTP/1.1" 200 -' + result = parse_log_line(line) + self.assertIsNone(result) + + def test_invalid_format(self): + """Test invalid log format.""" + line = "invalid log line" + result = parse_log_line(line) + self.assertIsNone(result) + + def test_malformed_date(self): + """Test malformed date in log entry.""" + line = '172.29.11.116.151 - - [invalid/date] "GET /logonoff?on:user123 HTTP/1.1" 200 -' + result = parse_log_line(line) + self.assertIsNone(result) + + +class TestGetLogFiles(unittest.TestCase): + """Test get_log_files function.""" + + def setUp(self): + """Create temporary directory with test log files.""" + self.temp_dir = tempfile.mkdtemp() + + # Create test log files + self.log_files = [ + "access_log", + "access_log-20241120", + "access_log-20241125", + "access_log-20241127", + "access_log-20241130", + "access_log-20241205", + "other_file.txt" + ] + + for filename in self.log_files: + open(os.path.join(self.temp_dir, filename), 'w').close() + + def tearDown(self): + """Clean up temporary directory.""" + for filename in self.log_files: + try: + os.remove(os.path.join(self.temp_dir, filename)) + except: + pass + os.rmdir(self.temp_dir) + + def test_get_log_files_in_range(self): + """Test getting log files within date range.""" + start_date = datetime(2024, 11, 27) + result = get_log_files(self.temp_dir, start_date) + + # Should get files from 1127 to 1204 (7 days after) + self.assertIn("access_log-20241127", result) + self.assertIn("access_log-20241130", result) + self.assertNotIn("access_log-20241125", result) + self.assertNotIn("access_log-20241205", result) + self.assertNotIn("other_file.txt", result) + + def test_fallback_to_access_log(self): + """Test fallback to access_log when no dated files match.""" + start_date = datetime(2024, 10, 1) + result = get_log_files(self.temp_dir, start_date) + + self.assertIn("access_log", result) + + +if __name__ == '__main__': + unittest.main() diff --git a/attendance_app/tests/test_processor.py b/attendance_app/tests/test_processor.py new file mode 100644 index 0000000..bc761e2 --- /dev/null +++ b/attendance_app/tests/test_processor.py @@ -0,0 +1,234 @@ +"""Unit tests for attendance processor.""" + +import unittest +from datetime import datetime +from attendance_app.processor import AttendanceProcessor +from attendance_app.database import MockUserDatabase + + +class TestAttendanceProcessor(unittest.TestCase): + """Test AttendanceProcessor class.""" + + def setUp(self): + """Set up test fixtures.""" + # Mock user database + self.mock_users = { + 'user123': ('S001', 'Taro Yamada', 'ヤマダ タロウ', 'CS', '01'), + 'user456': ('S002', 'Hanako Tanaka', 'タナカ ハナコ', 'AM', '01'), + 'staff1': ('M001', 'Ken Sato', 'サトウ ケン', 'Staff', '02'), + } + self.db = MockUserDatabase(self.mock_users) + self.processor = AttendanceProcessor(self.db.lookup_user) + + def test_simple_login_logout(self): + """Test simple login and logout.""" + events = { + 'GRL-01': [ + [datetime(2024, 11, 27, 14, 30), 'on:user123'], + [datetime(2024, 11, 27, 16, 30), 'off:user123'], + ] + } + + from_time = datetime(2024, 11, 27, 14, 0) + to_time = datetime(2024, 11, 27, 17, 0) + + results = self.processor.process_events(events, from_time, to_time) + + self.assertEqual(len(results), 1) + record = results[0] + self.assertEqual(record[0], 'GRL-01') # machine + self.assertEqual(record[1], '14:30〜16:30') # time range + self.assertEqual(record[4], 'user123') # user_id + self.assertEqual(record[5], 'S001') # student_no + self.assertEqual(record[6], 'Taro Yamada') # name + + def test_login_without_logout(self): + """Test login without matching logout.""" + events = { + 'GRL-01': [ + [datetime(2024, 11, 27, 14, 30), 'on:user123'], + ] + } + + from_time = datetime(2024, 11, 27, 14, 0) + to_time = datetime(2024, 11, 27, 17, 0) + + results = self.processor.process_events(events, from_time, to_time) + + self.assertEqual(len(results), 1) + record = results[0] + self.assertEqual(record[2], '---') # logout time missing + + def test_orphaned_logout(self): + """Test logout without matching login.""" + events = { + 'GRL-01': [ + [datetime(2024, 11, 27, 16, 30), 'off:user123'], + ] + } + + from_time = datetime(2024, 11, 27, 14, 0) + to_time = datetime(2024, 11, 27, 17, 0) + + results = self.processor.process_events(events, from_time, to_time) + + self.assertEqual(len(results), 1) + record = results[0] + self.assertTrue(record[1].startswith('?〜')) # login time unknown + + def test_double_login_rejection(self): + """Test double login rejection.""" + events = { + 'GRL-01': [ + [datetime(2024, 11, 27, 14, 30), 'on:user123'], + [datetime(2024, 11, 27, 14, 35), 'reject:user123:GRL-02'], + ] + } + + from_time = datetime(2024, 11, 27, 14, 0) + to_time = datetime(2024, 11, 27, 17, 0) + + results = self.processor.process_events(events, from_time, to_time) + + # Login should be rejected (removed) + self.assertEqual(len(results), 0) + + def test_startup_as_implicit_logout(self): + """Test startup event as implicit logout.""" + events = { + 'GRL-01': [ + [datetime(2024, 11, 27, 14, 30), 'on:user123'], + [datetime(2024, 11, 27, 16, 0), 'startup'], + ] + } + + from_time = datetime(2024, 11, 27, 14, 0) + to_time = datetime(2024, 11, 27, 17, 0) + + results = self.processor.process_events(events, from_time, to_time) + + self.assertEqual(len(results), 1) + record = results[0] + self.assertEqual(record[2], '16:00') # logout from startup + + def test_shutdown_as_implicit_logout(self): + """Test shutdown event as implicit logout.""" + events = { + 'GRL-01': [ + [datetime(2024, 11, 27, 14, 30), 'on:user123'], + [datetime(2024, 11, 27, 18, 0), 'shutdown'], + ] + } + + from_time = datetime(2024, 11, 27, 14, 0) + to_time = datetime(2024, 11, 27, 19, 0) + + results = self.processor.process_events(events, from_time, to_time) + + self.assertEqual(len(results), 1) + record = results[0] + self.assertEqual(record[2], '18:00') + + def test_unknown_user(self): + """Test handling of unknown user.""" + events = { + 'GRL-01': [ + [datetime(2024, 11, 27, 14, 30), 'on:unknown'], + ] + } + + from_time = datetime(2024, 11, 27, 14, 0) + to_time = datetime(2024, 11, 27, 17, 0) + + results = self.processor.process_events(events, from_time, to_time) + + self.assertEqual(len(results), 1) + record = results[0] + self.assertEqual(record[5], '????') # unknown student number + self.assertEqual(record[6], 'unknown') # user_id as name + + def test_multiple_machines(self): + """Test events from multiple machines.""" + events = { + 'GRL-01': [ + [datetime(2024, 11, 27, 14, 30), 'on:user123'], + [datetime(2024, 11, 27, 16, 30), 'off:user123'], + ], + 'ML1-05': [ + [datetime(2024, 11, 27, 15, 0), 'on:user456'], + [datetime(2024, 11, 27, 16, 0), 'off:user456'], + ] + } + + from_time = datetime(2024, 11, 27, 14, 0) + to_time = datetime(2024, 11, 27, 17, 0) + + results = self.processor.process_events(events, from_time, to_time) + + self.assertEqual(len(results), 2) + machines = [r[0] for r in results] + self.assertIn('GRL-01', machines) + self.assertIn('ML1-05', machines) + + +class TestFilterByRooms(unittest.TestCase): + """Test room filtering.""" + + def setUp(self): + """Set up test fixtures.""" + mock_users = {'user123': ('S001', 'Test User', 'テスト', 'CS', '01')} + self.db = MockUserDatabase(mock_users) + self.processor = AttendanceProcessor(self.db.lookup_user) + + def test_filter_grl_room(self): + """Test filtering by GRL room.""" + records = [ + ['GRL-01', '14:30', '16:30', 'on', 'user123', 'S001', 'Test', 'テスト', 'CS', '01', ''], + ['ML1-01', '14:30', '16:30', 'on', 'user123', 'S001', 'Test', 'テスト', 'CS', '01', ''], + ] + + room_filters = {'GRL': True, 'ML1': False, 'ML2': False, 'IML-A': False, 'IML-B': False} + filtered = self.processor.filter_by_rooms(records, room_filters) + + self.assertEqual(len(filtered), 1) + self.assertEqual(filtered[0][0], 'GRL-01') + + def test_filter_iml_subrooms(self): + """Test filtering IML sub-rooms.""" + records = [ + ['IML-01', '14:30', '16:30', 'on', 'user123', 'S001', 'Test', 'テスト', 'CS', '01', ''], # Room A + ['IML-62', '14:30', '16:30', 'on', 'user123', 'S001', 'Test', 'テスト', 'CS', '01', ''], # Room B + ] + + # Test IML-A only + room_filters = {'GRL': False, 'ML1': False, 'ML2': False, 'IML-A': True, 'IML-B': False} + filtered = self.processor.filter_by_rooms(records, room_filters) + self.assertEqual(len(filtered), 1) + self.assertEqual(filtered[0][0], 'IML-01') + + # Test IML-B only + room_filters = {'GRL': False, 'ML1': False, 'ML2': False, 'IML-A': False, 'IML-B': True} + filtered = self.processor.filter_by_rooms(records, room_filters) + self.assertEqual(len(filtered), 1) + self.assertEqual(filtered[0][0], 'IML-62') + + def test_unique_users_filter(self): + """Test unique users filtering.""" + records = [ + ['GRL-01', '14:30', '15:30', 'on', 'user123', 'S001', 'Test', 'テスト', 'CS', '01', ''], + ['GRL-02', '15:45', '16:30', 'on', 'user123', 'S001', 'Test', 'テスト', 'CS', '01', ''], + ] + + room_filters = {'GRL': True, 'ML1': False, 'ML2': False, 'IML-A': False, 'IML-B': False} + + # Without unique filter + filtered = self.processor.filter_by_rooms(records, room_filters, unique_users=False) + self.assertEqual(len(filtered), 2) + + # With unique filter + filtered = self.processor.filter_by_rooms(records, room_filters, unique_users=True) + self.assertEqual(len(filtered), 1) + + +if __name__ == '__main__': + unittest.main() diff --git a/attendance_app/tests/test_utils.py b/attendance_app/tests/test_utils.py new file mode 100644 index 0000000..cdc16ee --- /dev/null +++ b/attendance_app/tests/test_utils.py @@ -0,0 +1,55 @@ +"""Unit tests for utility functions.""" + +import unittest +from attendance_app.utils import machine_name + + +class TestMachineName(unittest.TestCase): + """Test machine_name function.""" + + def test_grl_room(self): + """Test GRL room IP mapping.""" + self.assertEqual(machine_name("172.29.11.116.151"), "GRL-01") + self.assertEqual(machine_name("172.29.11.116.152"), "GRL-02") + self.assertEqual(machine_name("172.29.11.116.199"), "GRL-49") + + def test_grl_alternate_range(self): + """Test GRL alternate IP range.""" + self.assertEqual(machine_name("172.29.11.116.81"), "GRL-01") + self.assertEqual(machine_name("172.29.11.116.82"), "GRL-02") + + def test_ml1_room(self): + """Test ML1 room IP mapping.""" + self.assertEqual(machine_name("172.29.11.117.151"), "ML1-01") + self.assertEqual(machine_name("172.29.11.117.160"), "ML1-10") + + def test_ml2_room(self): + """Test ML2 room IP mapping.""" + self.assertEqual(machine_name("172.29.11.118.151"), "ML2-01") + + def test_iml_room(self): + """Test IML room IP mapping.""" + self.assertEqual(machine_name("172.29.11.119.151"), "IML-01") + self.assertEqual(machine_name("172.29.11.119.213"), "IML-63") + + def test_teacher_stations(self): + """Test teacher station IP mapping.""" + self.assertEqual(machine_name("172.29.11.116.220"), "GRL-T00") + self.assertEqual(machine_name("172.29.11.116.221"), "GRL-T01") + self.assertEqual(machine_name("172.29.11.116.226"), "GRL-T01") + self.assertEqual(machine_name("172.29.11.117.220"), "ML1-T00") + + def test_invalid_ip(self): + """Test invalid IP addresses.""" + self.assertEqual(machine_name("192.168.1.1"), "192.168.1.1") + self.assertEqual(machine_name("172.29.11.120.151"), "172.29.11.120.151") + self.assertEqual(machine_name("172.29.11.116.50"), "172.29.11.116.50") + + def test_edge_cases(self): + """Test edge cases.""" + self.assertEqual(machine_name("172.29.11.116.150"), "172.29.11.116.150") + self.assertEqual(machine_name("172.29.11.116.200"), "172.29.11.116.200") + + +if __name__ == '__main__': + unittest.main() diff --git a/attendance_app/utils.py b/attendance_app/utils.py new file mode 100644 index 0000000..d8201d8 --- /dev/null +++ b/attendance_app/utils.py @@ -0,0 +1,50 @@ +"""Utility functions for attendance application.""" + + +def machine_name(ip_str): + """ + Convert IP address to machine name based on lab room conventions. + + Args: + ip_str: IP address string (e.g., "172.29.11.116.151") + + Returns: + Machine name (e.g., "GRL-01") or original IP if no match + """ + ip = [int(v) for v in ip_str.split(".")] + + if not ip_str.startswith("172.29.11"): + return ip_str + + # Determine room prefix based on fourth octet (index 3) + if ip[3] == 116: + prefix = "GRL" + elif ip[3] == 117: + prefix = "ML1" + elif ip[3] == 118: + prefix = "ML2" + elif ip[3] == 119: + prefix = "IML" + else: + return ip_str + + # Determine machine number based on fifth octet (index 4) + last_octet = ip[4] + + # Check regular workstation ranges + if (prefix != "IML" and 151 <= last_octet <= 150 + 49) or \ + (prefix == "IML" and 151 <= last_octet <= 150 + 63): + return "%s-%02d" % (prefix, last_octet - 150) + + # Check alternate workstation ranges + if (prefix != "IML" and 81 <= last_octet <= 81 + 49) or \ + (prefix == "IML" and 81 <= last_octet <= 81 + 63): + return "%s-%02d" % (prefix, last_octet - 80) + + # Check teacher station ranges + if 220 <= last_octet <= 220 + 5: + return "%s-T%02d" % (prefix, last_octet - 220) + if 226 <= last_octet <= 226 + 5: + return "%s-T%02d" % (prefix, last_octet - 225) + + return ip_str diff --git a/call_refactored.cgi b/call_refactored.cgi new file mode 100755 index 0000000..962c1e8 --- /dev/null +++ b/call_refactored.cgi @@ -0,0 +1,141 @@ +#!/usr/bin/env python3 +# encoding: utf-8 +""" +Refactored attendance tracking CGI script. + +This script parses Apache access logs to track computer lab attendance. +""" +import os +import sys +import cgi +import json +from datetime import datetime + +# Add the parent directory to the path to import our modules +sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) + +from attendance_app.log_parser import get_log_files, parse_log_files +from attendance_app.database import SQLiteUserDatabase +from attendance_app.processor import AttendanceProcessor +from attendance_app.excel_output import ExcelGenerator + + +def parse_form_parameters(form): + """ + Parse CGI form parameters. + + Args: + form: CGI FieldStorage object + + Returns: + Dictionary with parsed parameters + """ + params = { + 'date': form.getvalue('date'), + 'from': form.getvalue('from'), + 'to': form.getvalue('to'), + 'excel': 'excel' in form, + 'rooms': { + 'GRL': 'GRL' in form, + 'ML1': 'ML1' in form, + 'ML2': 'ML2' in form, + 'IML': 'IML' in form, + 'IML-A': 'IML-A' in form, + 'IML-B': 'IML-B' in form, + } + } + return params + + +def main(): + """Main CGI entry point.""" + # Determine log directory based on test mode + if sys.argv[-1] == "--test": + logdir = "./" + db_path = "./kguid_test.sqlite3" # Use test DB in test mode + template_path = "./attendancebook_template.xlsx" + else: + logdir = "/var/log/httpd/" + db_path = "/var/www/etc/kguid.sqlite3" + template_path = "./attendancebook_template.xlsx" + + # Parse form parameters + form = cgi.FieldStorage() + params = parse_form_parameters(form) + + # Parse date/time parameters + try: + from_time = datetime.strptime( + params['date'] + " " + params['from'], + "%Y/%m/%d %H:%M" + ) + to_time = datetime.strptime( + params['date'] + " " + params['to'], + "%Y/%m/%d %H:%M" + ) + except (ValueError, TypeError) as e: + print("Content-Type: text/plain") + print("Status: 400 Bad Request") + print("") + print(f"Invalid date/time parameters: {e}") + return + + # Get relevant log files + logfiles = get_log_files(logdir, from_time) + + # Parse log files + db = parse_log_files(logdir, logfiles) + + # Process attendance records + with SQLiteUserDatabase(db_path) as user_db: + processor = AttendanceProcessor(user_db.lookup_user) + records = processor.process_events(db, from_time, to_time) + + # Output results + if not params['excel']: + # JSON output + print("Content-Type: application/json") + print("") + json.dump(records, sys.stdout) + else: + # Excel output + try: + # Add openpyxl to path if in production + if sys.argv[-1] != "--test": + sys.path.append("/home/tkuro/.local/lib/python3.9/site-packages/") + + generator = ExcelGenerator(template_path) + temp_file = generator.generate(records, from_time, to_time, params['rooms']) + + # Send Excel file + filename = "attendancebook" + from_time.strftime("%Y%m%d%H%M") + ".xlsx" + with open(temp_file, "rb") as f: + finfo = os.stat(temp_file) + sys.stdout.buffer.write( + b"Content-Type: application/vnd.openxmlformats-officedocument.spreadsheetml.sheet\n" + ) + sys.stdout.buffer.write( + f'Content-Disposition: attachment; filename="{filename}"\n'.encode("ascii") + ) + sys.stdout.buffer.write( + f'Content-Length: {finfo.st_size}\n\n'.encode("ascii") + ) + sys.stdout.buffer.write(f.read()) + + # Clean up temp file + os.unlink(temp_file) + + except ImportError as e: + print("Content-Type: text/plain") + print("Status: 500 Internal Server Error") + print("") + print(f"Excel generation not available: {e}") + except Exception as e: + print("Content-Type: text/plain") + print("Status: 500 Internal Server Error") + print("") + print(f"Error generating Excel: {e}") + + +if __name__ == "__main__": + main() diff --git a/run_tests.py b/run_tests.py new file mode 100755 index 0000000..9480184 --- /dev/null +++ b/run_tests.py @@ -0,0 +1,50 @@ +#!/usr/bin/env python3 +""" +Run all unit tests for the attendance application. + +Usage: + python3 run_tests.py # Run all tests + python3 run_tests.py test_utils # Run specific test module + python3 run_tests.py -v # Verbose output +""" + +import sys +import unittest +import os + +# Add the parent directory to the path +sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) + + +def run_all_tests(): + """Run all unit tests.""" + loader = unittest.TestLoader() + start_dir = 'attendance_app/tests' + suite = loader.discover(start_dir, pattern='test_*.py') + + runner = unittest.TextTestRunner(verbosity=2) + result = runner.run(suite) + + return 0 if result.wasSuccessful() else 1 + + +def run_specific_test(test_name): + """Run a specific test module.""" + loader = unittest.TestLoader() + suite = loader.loadTestsFromName(f'attendance_app.tests.{test_name}') + + runner = unittest.TextTestRunner(verbosity=2) + result = runner.run(suite) + + return 0 if result.wasSuccessful() else 1 + + +if __name__ == '__main__': + if len(sys.argv) > 1 and not sys.argv[1].startswith('-'): + # Run specific test + exit_code = run_specific_test(sys.argv[1]) + else: + # Run all tests + exit_code = run_all_tests() + + sys.exit(exit_code)