"""Attendance data processing and state machine logic."""
from datetime import datetime, timedelta
import datetime as dt
from typing import List, Dict, Callable, Optional
class AttendanceRecord:
"""Represents a single attendance record."""
def __init__(self, machine: str, login_time: str, logout_time: str,
action: str, user_id: str, student_no: str, name: str,
reading: str, department: str, pos_code: str, status: str):
self.machine = machine
self.login_time = login_time
self.logout_time = logout_time
self.action = action
self.user_id = user_id
self.student_no = student_no
self.name = name
self.reading = reading
self.department = department
self.pos_code = pos_code
self.status = status
def to_list(self) -> List:
"""Convert record to list format for JSON output."""
return [
self.machine, self.login_time, self.logout_time, self.action,
self.user_id, self.student_no, self.name, self.reading,
self.department, self.pos_code, self.status
]
class AttendanceProcessor:
"""Process log events to generate attendance records."""
def __init__(self, user_lookup_func: Callable[[str], Optional[tuple]],
datefmt: str = "%H:%M"):
"""
Initialize processor with user lookup function.
Args:
user_lookup_func: Function that takes user_id and returns
(number, name_kanji, name_kana, depart_name, pos_code)
or None if not found
datefmt: Date format string for output times
"""
self.user_lookup = user_lookup_func
self.datefmt = datefmt
def process_events(self, db: Dict[str, List], from_time: datetime,
to_time: datetime) -> List[List]:
"""
Process machine events and generate attendance records.
Args:
db: Dictionary mapping machine names to [datetime, action] pairs
from_time: Start of time range to check
to_time: End of time range to check
Returns:
List of attendance records as lists
"""
lines = []
for machine in list(db.keys()):
candidate = []
active = False
reject = False
for date, action in db[machine]:
# Handle login events
if action.startswith("on:"):
# Remove previous incomplete entry if exists
if candidate and candidate[-1][2] == "---":
candidate.pop()
action_type, id_ = action.split(":", 1)
id_ = id_.lower()
if date < to_time:
date_str = date.strftime(self.datefmt)
active = True
user_data = self.user_lookup(id_)
if user_data:
candidate.append([
machine, date_str, "---", "on", id_,
user_data[0], user_data[1], user_data[2],
user_data[3], user_data[4], ""
])
else:
candidate.append([
machine, date_str, "---", "on", id_,
"????", id_, "--", "--", "--", ""
])
# Handle rejection (double-login)
elif action.startswith("reject") and candidate:
parts = action.split(":", 2)
if len(parts) >= 2:
_, id_, *_ = parts
if id_.lower() == candidate[-1][4]:
reject = True
candidate.pop()
# Handle startup/shutdown
elif action.startswith("startup") and candidate:
action = "off:" + candidate[-1][4]
elif action.startswith("shutdown") and candidate:
action = "off:" + candidate[-1][4]
# Handle logout events
if action.startswith("off:"):
action_type, id_ = action.split(":", 1)
if reject:
reject = False
elif candidate:
# If logout is before start time, remove candidate
if not from_time < date:
candidate.pop()
# If logout matches active login
elif active and id_ == candidate[-1][4]:
active = False
candidate[-1][2] = date.strftime(self.datefmt)
# Handle incomplete login time
if candidate[-1][1].startswith("?"):
candidate[-1][1] = "?〜" + date.strftime(self.datefmt)
else:
# Check for missing logoff record between specific hours
if (date.time() > dt.time(19, 55) and
date.time() > dt.time(18, 45)):
candidate[-1][1] += "〜記録無"
candidate[-1][10] = "ログオフ記録無"
else:
candidate[-1][1] += "〜" + date.strftime(self.datefmt)
# Logout with no matching login (orphaned logout)
elif from_time < date < to_time:
to_entry = date.strftime(self.datefmt)
entry = date.strftime(self.datefmt)
user_data = self.user_lookup(id_)
if user_data:
candidate.append([
machine, "?〜" + entry, to_entry, "off", id_,
user_data[0], user_data[1], user_data[2],
user_data[3], user_data[4], ""
])
else:
candidate.append([
machine, "?〜" + entry, "---", "off", id_,
"????", id_, "--", "--", "--", ""
])
lines.extend(candidate)
return lines
def filter_by_rooms(self, records: List[List], room_filters: Dict[str, bool],
unique_users: bool = False) -> List[List]:
"""
Filter records by room selection.
Args:
records: List of attendance records
room_filters: Dict of room names to boolean enabled status
unique_users: If True, remove duplicate entries for same user
Returns:
Filtered list of records
"""
filtered = []
prev_user = None
IML_ROOM_A = [
"01", "02", "03", "04", "05", "06",
"11", "12", "13", "14", "15", "16",
"21", "22", "23", "24", "25", "26",
"31", "32", "33", "34", "35", "36",
"41", "42", "43", "44",
"51", "52", "53", "54",
"K01", "K03", "61"
]
for record in records:
host = record[0]
user_id = record[4]
if host is None:
continue
accept = False
# Check room filters
if host.startswith("GRL") and room_filters.get("GRL", False):
accept = True
elif host.startswith("ML1") and room_filters.get("ML1", False):
accept = True
elif host.startswith("ML2") and room_filters.get("ML2", False):
accept = True
elif host.startswith("IML"):
machine_num = host[4:]
in_room_a = machine_num in IML_ROOM_A
if room_filters.get("IML-A", False) and in_room_a:
accept = True
if room_filters.get("IML-B", False) and not in_room_a:
accept = True
# Apply unique filter
if accept:
if not unique_users or prev_user != user_id:
filtered.append(record)
prev_user = user_id
return filtered