Newer
Older
attend-cgi / attendance_app / processor.py
"""Attendance data processing and state machine logic."""

from datetime import datetime, timedelta
import datetime as dt
from typing import List, Dict, Callable, Optional


class AttendanceRecord:
    """Represents a single attendance record."""

    def __init__(self, machine: str, login_time: str, logout_time: str,
                 action: str, user_id: str, student_no: str, name: str,
                 reading: str, department: str, pos_code: str, status: str):
        self.machine = machine
        self.login_time = login_time
        self.logout_time = logout_time
        self.action = action
        self.user_id = user_id
        self.student_no = student_no
        self.name = name
        self.reading = reading
        self.department = department
        self.pos_code = pos_code
        self.status = status

    def to_list(self) -> List:
        """Convert record to list format for JSON output."""
        return [
            self.machine, self.login_time, self.logout_time, self.action,
            self.user_id, self.student_no, self.name, self.reading,
            self.department, self.pos_code, self.status
        ]


class AttendanceProcessor:
    """Process log events to generate attendance records."""

    def __init__(self, user_lookup_func: Callable[[str], Optional[tuple]],
                 datefmt: str = "%H:%M"):
        """
        Initialize processor with user lookup function.

        Args:
            user_lookup_func: Function that takes user_id and returns
                             (number, name_kanji, name_kana, depart_name, pos_code)
                             or None if not found
            datefmt: Date format string for output times
        """
        self.user_lookup = user_lookup_func
        self.datefmt = datefmt

    def process_events(self, db: Dict[str, List], from_time: datetime,
                       to_time: datetime) -> List[List]:
        """
        Process machine events and generate attendance records.

        Args:
            db: Dictionary mapping machine names to [datetime, action] pairs
            from_time: Start of time range to check
            to_time: End of time range to check

        Returns:
            List of attendance records as lists
        """
        lines = []

        for machine in list(db.keys()):
            candidate = []
            active = False
            reject = False

            for date, action in db[machine]:
                # Handle login events
                if action.startswith("on:"):
                    # Remove previous incomplete entry if exists
                    if candidate and candidate[-1][2] == "---":
                        candidate.pop()

                    action_type, id_ = action.split(":", 1)
                    id_ = id_.lower()

                    if date < to_time:
                        date_str = date.strftime(self.datefmt)
                        active = True
                        user_data = self.user_lookup(id_)

                        if user_data:
                            candidate.append([
                                machine, date_str, "---", "on", id_,
                                user_data[0], user_data[1], user_data[2],
                                user_data[3], user_data[4], ""
                            ])
                        else:
                            candidate.append([
                                machine, date_str, "---", "on", id_,
                                "????", id_, "--", "--", "--", ""
                            ])

                # Handle rejection (double-login)
                elif action.startswith("reject") and candidate:
                    parts = action.split(":", 2)
                    if len(parts) >= 2:
                        _, id_, *_ = parts
                        if id_.lower() == candidate[-1][4]:
                            reject = True
                            candidate.pop()

                # Handle startup/shutdown
                elif action.startswith("startup") and candidate:
                    action = "off:" + candidate[-1][4]
                elif action.startswith("shutdown") and candidate:
                    action = "off:" + candidate[-1][4]

                # Handle logout events
                if action.startswith("off:"):
                    action_type, id_ = action.split(":", 1)

                    if reject:
                        reject = False
                    elif candidate:
                        # If logout is before start time, remove candidate
                        if not from_time < date:
                            candidate.pop()
                        # If logout matches active login
                        elif active and id_ == candidate[-1][4]:
                            active = False
                            candidate[-1][2] = date.strftime(self.datefmt)

                            # Handle incomplete login time
                            if candidate[-1][1].startswith("?"):
                                candidate[-1][1] = "?〜" + date.strftime(self.datefmt)
                            else:
                                # Check for missing logoff record between specific hours
                                if (date.time() > dt.time(19, 55) and
                                    date.time() > dt.time(18, 45)):
                                    candidate[-1][1] += "〜記録無"
                                    candidate[-1][10] = "ログオフ記録無"
                                else:
                                    candidate[-1][1] += "〜" + date.strftime(self.datefmt)

                    # Logout with no matching login (orphaned logout)
                    elif from_time < date < to_time:
                        to_entry = date.strftime(self.datefmt)
                        entry = date.strftime(self.datefmt)
                        user_data = self.user_lookup(id_)

                        if user_data:
                            candidate.append([
                                machine, "?〜" + entry, to_entry, "off", id_,
                                user_data[0], user_data[1], user_data[2],
                                user_data[3], user_data[4], ""
                            ])
                        else:
                            candidate.append([
                                machine, "?〜" + entry, "---", "off", id_,
                                "????", id_, "--", "--", "--", ""
                            ])

            lines.extend(candidate)

        return lines

    def filter_by_rooms(self, records: List[List], room_filters: Dict[str, bool],
                       unique_users: bool = False) -> List[List]:
        """
        Filter records by room selection.

        Args:
            records: List of attendance records
            room_filters: Dict of room names to boolean enabled status
            unique_users: If True, remove duplicate entries for same user

        Returns:
            Filtered list of records
        """
        filtered = []
        prev_user = None

        IML_ROOM_A = [
            "01", "02", "03", "04", "05", "06",
            "11", "12", "13", "14", "15", "16",
            "21", "22", "23", "24", "25", "26",
            "31", "32", "33", "34", "35", "36",
            "41", "42", "43", "44",
            "51", "52", "53", "54",
            "K01", "K03", "61"
        ]

        for record in records:
            host = record[0]
            user_id = record[4]

            if host is None:
                continue

            accept = False

            # Check room filters
            if host.startswith("GRL") and room_filters.get("GRL", False):
                accept = True
            elif host.startswith("ML1") and room_filters.get("ML1", False):
                accept = True
            elif host.startswith("ML2") and room_filters.get("ML2", False):
                accept = True
            elif host.startswith("IML"):
                machine_num = host[4:]
                in_room_a = machine_num in IML_ROOM_A

                if room_filters.get("IML-A", False) and in_room_a:
                    accept = True
                if room_filters.get("IML-B", False) and not in_room_a:
                    accept = True

            # Apply unique filter
            if accept:
                if not unique_users or prev_user != user_id:
                    filtered.append(record)
                prev_user = user_id

        return filtered