Newer
Older
attend-cgi / attendance_app / log_parser.py
"""Log parsing functions for attendance tracking."""

import os
import re
from datetime import datetime, timedelta
from typing import List, Tuple

from .utils import machine_name


def get_log_files(logdir: str, start_date: datetime) -> List[str]:
    """
    Get list of relevant log files for a given date range.

    Args:
        logdir: Directory containing log files
        start_date: Start date for log search

    Returns:
        List of log file paths to process
    """
    tail = start_date + timedelta(7)  # a week after
    lower = "access_log-%04d%02d%02d" % (start_date.year, start_date.month, start_date.day)
    higher = "access_log-%04d%02d%02d" % (tail.year, tail.month, tail.day)

    logfiles = [
        fname for fname in os.listdir(logdir)
        if fname.startswith("access_log-") and lower <= fname <= higher
    ]

    if not logfiles:
        logfiles.append('access_log')

    return logfiles


def parse_log_line(line: str) -> Tuple[str, datetime, str] | None:
    """
    Parse a single log line for logon/logoff events.

    Args:
        line: Raw log line

    Returns:
        Tuple of (machine_name, datetime, action) or None if line doesn't match
    """
    if "logonoff" not in line:
        return None

    match = re.match(
        r'^([0-9.]*).*\[(\d+/\w+/\d+:\d+:\d+:\d+).*logonoff\?(.*) HTTP',
        line
    )

    if not match:
        return None

    ip, date_str, action = match.groups()

    try:
        date = datetime.strptime(date_str, '%d/%b/%Y:%H:%M:%S')
        host = machine_name(ip)
        return (host, date, action)
    except ValueError:
        return None


def parse_log_files(logdir: str, logfiles: List[str]) -> dict:
    """
    Parse log files and group events by machine.

    Args:
        logdir: Directory containing log files
        logfiles: List of log file names to parse

    Returns:
        Dictionary mapping machine names to list of [datetime, action] pairs
    """
    db = {}

    for fname in logfiles[-2:]:  # Only process last 2 files
        filepath = os.path.join(logdir, fname)

        try:
            with open(filepath) as f:
                for line in f:
                    result = parse_log_line(line)
                    if result:
                        host, date, action = result
                        if host in db:
                            db[host].append([date, action])
                        else:
                            db[host] = [[date, action]]
        except (IOError, OSError):
            # Skip files that can't be opened
            continue

    return db