"""Log parsing functions for attendance tracking."""
import os
import re
from datetime import datetime, timedelta
from typing import List, Tuple
from .utils import machine_name
def get_log_files(logdir: str, start_date: datetime) -> List[str]:
"""
Get list of relevant log files for a given date range.
Args:
logdir: Directory containing log files
start_date: Start date for log search
Returns:
List of log file paths to process
"""
tail = start_date + timedelta(7) # a week after
lower = "access_log-%04d%02d%02d" % (start_date.year, start_date.month, start_date.day)
higher = "access_log-%04d%02d%02d" % (tail.year, tail.month, tail.day)
logfiles = [
fname for fname in os.listdir(logdir)
if fname.startswith("access_log-") and lower <= fname <= higher
]
if not logfiles:
logfiles.append('access_log')
return logfiles
def parse_log_line(line: str) -> Tuple[str, datetime, str] | None:
"""
Parse a single log line for logon/logoff events.
Args:
line: Raw log line
Returns:
Tuple of (machine_name, datetime, action) or None if line doesn't match
"""
if "logonoff" not in line:
return None
match = re.match(
r'^([0-9.]*).*\[(\d+/\w+/\d+:\d+:\d+:\d+).*logonoff\?(.*) HTTP',
line
)
if not match:
return None
ip, date_str, action = match.groups()
try:
date = datetime.strptime(date_str, '%d/%b/%Y:%H:%M:%S')
host = machine_name(ip)
return (host, date, action)
except ValueError:
return None
def parse_log_files(logdir: str, logfiles: List[str]) -> dict:
"""
Parse log files and group events by machine.
Args:
logdir: Directory containing log files
logfiles: List of log file names to parse
Returns:
Dictionary mapping machine names to list of [datetime, action] pairs
"""
db = {}
for fname in logfiles[-2:]: # Only process last 2 files
filepath = os.path.join(logdir, fname)
try:
with open(filepath) as f:
for line in f:
result = parse_log_line(line)
if result:
host, date, action = result
if host in db:
db[host].append([date, action])
else:
db[host] = [[date, action]]
except (IOError, OSError):
# Skip files that can't be opened
continue
return db