mirror of
https://github.com/jazzband/django-auditlog.git
synced 2026-03-16 22:20:26 +00:00
Add PostgreSQL partition management and docs
This commit is contained in:
parent
0e58a9d2d5
commit
297702821e
5 changed files with 1125 additions and 0 deletions
|
|
@ -6,6 +6,7 @@
|
|||
|
||||
- Add `AUDITLOG_USE_BASE_MANAGER` setting to override default manager use ([#766](https://github.com/jazzband/django-auditlog/pull/766))
|
||||
- Drop 'Python 3.9' support ([#773](https://github.com/jazzband/django-auditlog/pull/773))
|
||||
- Add PostgreSQL partition management commands (init/create/prune/status) for scaling LogEntry tables ([#781](https://github.com/jazzband/django-auditlog/pull/781))
|
||||
|
||||
## 3.3.0 (2025-09-18)
|
||||
|
||||
|
|
|
|||
|
|
@ -67,3 +67,30 @@ settings.AUDITLOG_MASK_CALLABLE = getattr(settings, "AUDITLOG_MASK_CALLABLE", No
|
|||
settings.AUDITLOG_USE_BASE_MANAGER = getattr(
|
||||
settings, "AUDITLOG_USE_BASE_MANAGER", False
|
||||
)
|
||||
|
||||
# PostgreSQL partitioning defaults
|
||||
settings.AUDITLOG_PARTITIONED = getattr(
|
||||
settings,
|
||||
"AUDITLOG_PARTITIONED",
|
||||
False,
|
||||
)
|
||||
settings.AUDITLOG_PARTITION_BY = getattr(
|
||||
settings,
|
||||
"AUDITLOG_PARTITION_BY",
|
||||
"timestamp",
|
||||
)
|
||||
settings.AUDITLOG_PARTITION_INTERVAL = getattr(
|
||||
settings,
|
||||
"AUDITLOG_PARTITION_INTERVAL",
|
||||
"month",
|
||||
)
|
||||
settings.AUDITLOG_PARTITION_AHEAD_MONTHS = getattr(
|
||||
settings,
|
||||
"AUDITLOG_PARTITION_AHEAD_MONTHS",
|
||||
3,
|
||||
)
|
||||
settings.AUDITLOG_PARTITION_RETENTION_MONTHS = getattr(
|
||||
settings,
|
||||
"AUDITLOG_PARTITION_RETENTION_MONTHS",
|
||||
None,
|
||||
)
|
||||
|
|
|
|||
836
auditlog/management/commands/auditlogpartition.py
Normal file
836
auditlog/management/commands/auditlogpartition.py
Normal file
|
|
@ -0,0 +1,836 @@
|
|||
import contextlib
|
||||
import datetime
|
||||
from collections.abc import Iterable, Iterator
|
||||
from dataclasses import dataclass
|
||||
|
||||
from django.conf import settings
|
||||
from django.core.management.base import BaseCommand, CommandError
|
||||
from django.db import DEFAULT_DB_ALIAS, connections, transaction
|
||||
|
||||
from auditlog.models import LogEntry
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class PartitionBounds:
|
||||
lower: datetime.datetime
|
||||
upper: datetime.datetime
|
||||
|
||||
@property
|
||||
def name_suffix(self) -> str:
|
||||
return f"{self.lower:%Y_%m}"
|
||||
|
||||
|
||||
class Command(BaseCommand):
|
||||
help = "Manage PostgreSQL range partitions for auditlog's LogEntry table."
|
||||
requires_migrations_checks = True
|
||||
|
||||
def add_arguments(self, parser):
|
||||
subparsers = parser.add_subparsers(dest="subcommand")
|
||||
subparsers.required = True
|
||||
|
||||
init_parser = subparsers.add_parser(
|
||||
"init",
|
||||
help=(
|
||||
"Create the partitioned parent table (empty or populated via --convert). "
|
||||
"Only supported on PostgreSQL."
|
||||
),
|
||||
)
|
||||
self._add_common_db_argument(init_parser)
|
||||
init_parser.add_argument(
|
||||
"--ahead",
|
||||
type=int,
|
||||
default=None,
|
||||
help="Number of future months to create partitions for "
|
||||
"(defaults to AUDITLOG_PARTITION_AHEAD_MONTHS).",
|
||||
)
|
||||
init_parser.add_argument(
|
||||
"--retention-months",
|
||||
type=int,
|
||||
default=None,
|
||||
help="Optional retention window in months; used with --convert to pre-create partitions "
|
||||
"covering the retention horizon plus ahead months "
|
||||
"defaults to AUDITLOG_PARTITION_RETENTION_MONTHS).",
|
||||
)
|
||||
init_parser.add_argument(
|
||||
"--convert",
|
||||
action="store_true",
|
||||
help=(
|
||||
"Best-effort conversion when the table already contains rows. Requires downtime "
|
||||
"and runs a full COPY into the partitioned parent."
|
||||
),
|
||||
)
|
||||
|
||||
create_parser = subparsers.add_parser(
|
||||
"create",
|
||||
help="Create monthly partitions for the configured interval.",
|
||||
)
|
||||
self._add_common_db_argument(create_parser)
|
||||
create_parser.add_argument(
|
||||
"--start",
|
||||
type=_parse_year_month,
|
||||
default=None,
|
||||
help="Start month inclusive (YYYY-MM). Defaults to current month.",
|
||||
)
|
||||
create_parser.add_argument(
|
||||
"--end",
|
||||
type=_parse_year_month,
|
||||
default=None,
|
||||
help="End month exclusive (YYYY-MM). Defaults to start + ahead months.",
|
||||
)
|
||||
create_parser.add_argument(
|
||||
"--ahead",
|
||||
type=int,
|
||||
default=None,
|
||||
help="If start/end omitted, create this many months ahead of the current month "
|
||||
"(defaults to AUDITLOG_PARTITION_AHEAD_MONTHS).",
|
||||
)
|
||||
|
||||
prune_parser = subparsers.add_parser(
|
||||
"prune",
|
||||
help="Drop partitions older than the retention window.",
|
||||
)
|
||||
self._add_common_db_argument(prune_parser)
|
||||
prune_parser.add_argument(
|
||||
"--retention-months",
|
||||
type=int,
|
||||
default=None,
|
||||
help="Retention window in months. Defaults to AUDITLOG_PARTITION_RETENTION_MONTHS.",
|
||||
)
|
||||
prune_parser.add_argument(
|
||||
"--dry-run",
|
||||
action="store_true",
|
||||
help="Show partitions that would be dropped without executing.",
|
||||
)
|
||||
|
||||
status_parser = subparsers.add_parser(
|
||||
"status",
|
||||
help="Display partitioning status and existing partitions.",
|
||||
)
|
||||
self._add_common_db_argument(status_parser)
|
||||
|
||||
def _add_common_db_argument(self, parser):
|
||||
parser.add_argument(
|
||||
"--database",
|
||||
default=DEFAULT_DB_ALIAS,
|
||||
help="Database alias to operate on.",
|
||||
)
|
||||
|
||||
def handle(self, *args, **options):
|
||||
subcommand = options.pop("subcommand")
|
||||
database = options.pop("database", DEFAULT_DB_ALIAS)
|
||||
connection = self._get_postgres_connection(database)
|
||||
table = LogEntry._meta.db_table
|
||||
|
||||
if subcommand == "init":
|
||||
self._handle_init(connection, table, **options)
|
||||
elif subcommand == "create":
|
||||
self._handle_create(connection, table, **options)
|
||||
elif subcommand == "prune":
|
||||
self._handle_prune(connection, table, **options)
|
||||
elif subcommand == "status":
|
||||
self._handle_status(connection, table, **options)
|
||||
else:
|
||||
raise CommandError(f"Unknown subcommand: {subcommand}")
|
||||
|
||||
def _handle_init(
|
||||
self,
|
||||
connection,
|
||||
table: str,
|
||||
*,
|
||||
ahead: int | None,
|
||||
retention_months: int | None,
|
||||
convert: bool,
|
||||
**_,
|
||||
):
|
||||
if self._is_partitioned(connection, table):
|
||||
self.stdout.write("Table is already partitioned; no action taken.")
|
||||
return
|
||||
|
||||
ahead = _coalesce_int(ahead, settings.AUDITLOG_PARTITION_AHEAD_MONTHS)
|
||||
retention_months = _coalesce_int(
|
||||
retention_months, settings.AUDITLOG_PARTITION_RETENTION_MONTHS
|
||||
)
|
||||
if ahead is not None and ahead < 0:
|
||||
raise CommandError("Ahead months must be zero or positive.")
|
||||
if retention_months is not None and retention_months < 0:
|
||||
raise CommandError("Retention months must be zero or positive.")
|
||||
|
||||
if not self._table_exists(connection, table):
|
||||
raise CommandError(
|
||||
f"Table '{table}' does not exist. Run migrations before partitioning."
|
||||
)
|
||||
|
||||
schema, base_table = _split_schema_name(table)
|
||||
old_table_base = f"{base_table}_old"
|
||||
old_table = _qualified_name(schema, old_table_base)
|
||||
shadow_table = _qualified_name(schema, f"{base_table}_shadow")
|
||||
|
||||
self._ensure_table_absent(connection, old_table)
|
||||
|
||||
if convert:
|
||||
self._ensure_table_absent(connection, shadow_table)
|
||||
row_count = self._table_rowcount(connection, table)
|
||||
if row_count == 0:
|
||||
self.stdout.write(
|
||||
"Table is empty; falling back to safe initialization."
|
||||
)
|
||||
convert = False
|
||||
else:
|
||||
self._convert_existing_table(
|
||||
connection=connection,
|
||||
table=table,
|
||||
schema=schema,
|
||||
base_table=base_table,
|
||||
shadow_table=shadow_table,
|
||||
old_table=old_table,
|
||||
old_table_base=old_table_base,
|
||||
ahead=ahead,
|
||||
retention_months=retention_months,
|
||||
row_count=row_count,
|
||||
)
|
||||
self.stdout.write("Partitioning initialized successfully.")
|
||||
return
|
||||
|
||||
self._initialize_empty_table(
|
||||
connection=connection,
|
||||
table=table,
|
||||
schema=schema,
|
||||
base_table=base_table,
|
||||
old_table=old_table,
|
||||
ahead=ahead,
|
||||
retention_months=retention_months,
|
||||
)
|
||||
self.stdout.write("Partitioning initialized successfully.")
|
||||
|
||||
def _initialize_empty_table(
|
||||
self,
|
||||
connection,
|
||||
table: str,
|
||||
schema: str | None,
|
||||
base_table: str,
|
||||
old_table: str,
|
||||
ahead: int | None,
|
||||
retention_months: int | None,
|
||||
):
|
||||
row_count = self._table_rowcount(connection, table)
|
||||
if row_count != 0:
|
||||
raise CommandError(
|
||||
"Table is not empty. Use --convert to perform best-effort conversion."
|
||||
)
|
||||
|
||||
partition_months = self._collect_partition_months(
|
||||
bounds=None,
|
||||
ahead=ahead,
|
||||
retention_months=retention_months,
|
||||
)
|
||||
partitions = [_partition_bounds_for_month(month) for month in partition_months]
|
||||
|
||||
with transaction.atomic(using=connection.alias):
|
||||
self._lock_table(connection, table)
|
||||
self._rename_table(connection, table, f"{base_table}_old")
|
||||
self._create_partitioned_parent(connection, table, old_table)
|
||||
|
||||
all_indexes = list(self._indexed_columns(connection))
|
||||
for partition in partitions:
|
||||
self._create_partition(connection, table, partition)
|
||||
|
||||
self._create_partition_indexes(connection, table, all_indexes)
|
||||
sequence_name = self._prepare_sequence(connection, table, old_table)
|
||||
if sequence_name:
|
||||
self._reset_sequence(connection, table, sequence_name, empty=True)
|
||||
|
||||
self._drop_table(connection, old_table)
|
||||
|
||||
def _convert_existing_table(
|
||||
self,
|
||||
connection,
|
||||
table: str,
|
||||
schema: str | None,
|
||||
base_table: str,
|
||||
shadow_table: str,
|
||||
old_table: str,
|
||||
old_table_base: str,
|
||||
ahead: int | None,
|
||||
retention_months: int | None,
|
||||
row_count: int,
|
||||
):
|
||||
bounds = self._timestamp_bounds(connection, table)
|
||||
if bounds is None:
|
||||
raise CommandError("Unable to determine timestamp bounds for conversion.")
|
||||
|
||||
partition_months = self._collect_partition_months(
|
||||
bounds=bounds,
|
||||
ahead=ahead,
|
||||
retention_months=retention_months,
|
||||
)
|
||||
partitions = [_partition_bounds_for_month(month) for month in partition_months]
|
||||
initial_max_id = self._table_max_id(connection, table)
|
||||
sequence = self._sequence_name(connection, table)
|
||||
|
||||
self.stdout.write(
|
||||
f"Converting populated table with approximately {row_count} rows. "
|
||||
"Minimal-downtime swap in progress..."
|
||||
)
|
||||
|
||||
shadow_created = False
|
||||
try:
|
||||
self._create_partitioned_parent(connection, shadow_table, table)
|
||||
shadow_created = True
|
||||
|
||||
all_indexes = list(self._indexed_columns(connection))
|
||||
for partition in partitions:
|
||||
self._create_partition(connection, shadow_table, partition)
|
||||
|
||||
self._create_partition_indexes(connection, shadow_table, all_indexes)
|
||||
if sequence:
|
||||
self._set_sequence_default(connection, shadow_table, sequence)
|
||||
|
||||
self._copy_table_to_shadow(
|
||||
connection=connection,
|
||||
source_table=table,
|
||||
target_table=shadow_table,
|
||||
partitions=partitions,
|
||||
max_id_snapshot=initial_max_id,
|
||||
)
|
||||
|
||||
with transaction.atomic(using=connection.alias):
|
||||
self._lock_table(connection, table)
|
||||
self._lock_table(connection, shadow_table)
|
||||
|
||||
self._sync_delta_rows(
|
||||
connection=connection,
|
||||
source_table=table,
|
||||
target_table=shadow_table,
|
||||
last_copied_id=initial_max_id,
|
||||
)
|
||||
|
||||
self._ensure_table_absent(connection, old_table)
|
||||
|
||||
self._rename_table(connection, table, old_table_base)
|
||||
self._rename_table(connection, shadow_table, base_table)
|
||||
shadow_created = False
|
||||
|
||||
new_table = _qualified_name(schema, base_table)
|
||||
if sequence:
|
||||
self._assign_sequence_owner(connection, sequence, new_table)
|
||||
self._set_sequence_default(connection, new_table, sequence)
|
||||
self._reset_sequence(connection, new_table, sequence)
|
||||
else:
|
||||
new_sequence = self._sequence_name(connection, new_table)
|
||||
if new_sequence:
|
||||
self._reset_sequence(connection, new_table, new_sequence)
|
||||
|
||||
self._drop_table(connection, old_table)
|
||||
finally:
|
||||
if shadow_created:
|
||||
self._drop_table_if_exists(connection, shadow_table)
|
||||
|
||||
def _handle_create(
|
||||
self,
|
||||
connection,
|
||||
table: str,
|
||||
*,
|
||||
start: datetime.date | None,
|
||||
end: datetime.date | None,
|
||||
ahead: int | None,
|
||||
**_,
|
||||
):
|
||||
if not self._is_partitioned(connection, table):
|
||||
raise CommandError(
|
||||
"Table is not partitioned. Run 'auditlogpartition init' first."
|
||||
)
|
||||
|
||||
ahead = _coalesce_int(ahead, settings.AUDITLOG_PARTITION_AHEAD_MONTHS)
|
||||
if ahead is not None and ahead < 0:
|
||||
raise CommandError("Ahead months must be zero or positive.")
|
||||
today_month = _month_start(datetime.date.today())
|
||||
start_month = start or today_month
|
||||
if end:
|
||||
end_month = end
|
||||
else:
|
||||
ahead_value = max(ahead or 0, 0)
|
||||
end_month = _add_months(start_month, ahead_value + 1)
|
||||
|
||||
if end_month <= start_month:
|
||||
raise CommandError("End month must be after start month.")
|
||||
|
||||
created = 0
|
||||
for month in _iter_months(start_month, end_month, inclusive=False):
|
||||
partition = _partition_bounds_for_month(month)
|
||||
if self._create_partition(connection, table, partition):
|
||||
created += 1
|
||||
|
||||
if created:
|
||||
self.stdout.write(f"Created {created} partition(s).")
|
||||
else:
|
||||
self.stdout.write("No new partitions were created.")
|
||||
|
||||
def _handle_prune(
|
||||
self,
|
||||
connection,
|
||||
table: str,
|
||||
*,
|
||||
retention_months: int | None,
|
||||
dry_run: bool,
|
||||
**_,
|
||||
):
|
||||
if not self._is_partitioned(connection, table):
|
||||
raise CommandError(
|
||||
"Table is not partitioned. Run 'auditlogpartition init' first."
|
||||
)
|
||||
|
||||
retention_months = _coalesce_int(
|
||||
retention_months, settings.AUDITLOG_PARTITION_RETENTION_MONTHS
|
||||
)
|
||||
if retention_months is None:
|
||||
raise CommandError(
|
||||
"Retention window is not configured. Provide --retention-months "
|
||||
"or set AUDITLOG_PARTITION_RETENTION_MONTHS."
|
||||
)
|
||||
if retention_months <= 0:
|
||||
raise CommandError("Retention months must be greater than zero.")
|
||||
|
||||
cutoff_month = _add_months(
|
||||
_month_start(datetime.date.today()), -retention_months
|
||||
)
|
||||
partitions = list(self._list_partitions(connection, table))
|
||||
drop_candidates = [p for p in partitions if p.lower.date() < cutoff_month]
|
||||
|
||||
if not drop_candidates:
|
||||
self.stdout.write("No partitions eligible for pruning.")
|
||||
return
|
||||
|
||||
if dry_run:
|
||||
self.stdout.write("Partitions that would be dropped:")
|
||||
for part in drop_candidates:
|
||||
self.stdout.write(
|
||||
f" - {self._partition_name(table, part)} "
|
||||
f"[{part.lower.isoformat()} → {part.upper.isoformat()})"
|
||||
)
|
||||
return
|
||||
|
||||
for part in drop_candidates:
|
||||
self._drop_partition(connection, table, part)
|
||||
|
||||
self.stdout.write(f"Dropped {len(drop_candidates)} partition(s).")
|
||||
|
||||
def _handle_status(self, connection, table: str, **_):
|
||||
partitioned = self._is_partitioned(connection, table)
|
||||
if not partitioned:
|
||||
self.stdout.write("Partitioned: no")
|
||||
return
|
||||
|
||||
self.stdout.write("Partitioned: yes")
|
||||
partitions = list(self._list_partitions(connection, table))
|
||||
if not partitions:
|
||||
self.stdout.write("No partitions found.")
|
||||
return
|
||||
|
||||
self.stdout.write("Partitions:")
|
||||
for part in partitions:
|
||||
name = self._partition_name(table, part)
|
||||
self.stdout.write(
|
||||
f" - {name} [{part.lower.isoformat()} → {part.upper.isoformat()})"
|
||||
)
|
||||
|
||||
def _get_postgres_connection(self, alias: str):
|
||||
try:
|
||||
connection = connections[alias]
|
||||
except KeyError:
|
||||
raise CommandError(f"Unknown database alias '{alias}'.")
|
||||
|
||||
if connection.vendor != "postgresql":
|
||||
raise CommandError(
|
||||
f"auditlogpartition only supports PostgreSQL. Database '{alias}' "
|
||||
f"uses vendor '{connection.vendor}'."
|
||||
)
|
||||
return connection
|
||||
|
||||
def _lock_table(self, connection, table: str):
|
||||
with connection.cursor() as cursor:
|
||||
cursor.execute(
|
||||
f"LOCK TABLE {self._qn(connection, table)} IN ACCESS EXCLUSIVE MODE;"
|
||||
)
|
||||
|
||||
def _rename_table(self, connection, table: str, new_base_name: str):
|
||||
if "." in new_base_name:
|
||||
raise CommandError("New table name must not include a schema qualifier.")
|
||||
with connection.cursor() as cursor:
|
||||
cursor.execute(
|
||||
"ALTER TABLE {table} RENAME TO {name};".format(
|
||||
table=self._qn(connection, table),
|
||||
name=connection.ops.quote_name(new_base_name),
|
||||
)
|
||||
)
|
||||
|
||||
def _create_partitioned_parent(self, connection, table: str, old_table: str):
|
||||
with connection.cursor() as cursor:
|
||||
cursor.execute(
|
||||
f"""
|
||||
CREATE TABLE {self._qn(connection, table)} (
|
||||
LIKE {self._qn(connection, old_table)}
|
||||
INCLUDING DEFAULTS
|
||||
INCLUDING GENERATED
|
||||
INCLUDING STORAGE
|
||||
INCLUDING COMMENTS
|
||||
)
|
||||
PARTITION BY RANGE (timestamp);
|
||||
"""
|
||||
)
|
||||
|
||||
def _create_partition_indexes(self, connection, table: str, columns: Iterable[str]):
|
||||
base_table = _base_table_name(table)
|
||||
for column in columns:
|
||||
index_name = f"{base_table}_{column}_idx"
|
||||
with connection.cursor() as cursor:
|
||||
cursor.execute(
|
||||
f"CREATE INDEX IF NOT EXISTS {self._qn(connection, index_name)} "
|
||||
f"ON {self._qn(connection, table)} ({self._qn(connection, column, allow_schema=False)});"
|
||||
)
|
||||
|
||||
def _prepare_sequence(self, connection, table: str, old_table: str) -> str | None:
|
||||
sequence = self._sequence_name(connection, old_table)
|
||||
if not sequence:
|
||||
return None
|
||||
|
||||
target = self._target_sequence_name(sequence, table)
|
||||
if target != sequence:
|
||||
sequence = self._rename_sequence(connection, sequence, target)
|
||||
|
||||
self._assign_sequence_owner(connection, sequence, table)
|
||||
self._set_sequence_default(connection, table, sequence)
|
||||
return sequence
|
||||
|
||||
def _bulk_copy_into_parent(self, connection, table: str, old_table: str):
|
||||
with connection.cursor() as cursor:
|
||||
cursor.execute(
|
||||
f"INSERT INTO {self._qn(connection, table)} SELECT * FROM {self._qn(connection, old_table)};"
|
||||
)
|
||||
|
||||
def _assign_sequence_owner(self, connection, sequence: str, table: str):
|
||||
with connection.cursor() as cursor:
|
||||
cursor.execute(
|
||||
f"ALTER SEQUENCE {self._qn(connection, sequence)} OWNED BY {self._qn(connection, table)}.id;"
|
||||
)
|
||||
|
||||
def _set_sequence_default(self, connection, table: str, sequence: str):
|
||||
with connection.cursor() as cursor:
|
||||
cursor.execute(
|
||||
f"ALTER TABLE {self._qn(connection, table)} ALTER COLUMN id SET DEFAULT nextval(%s);",
|
||||
[sequence],
|
||||
)
|
||||
|
||||
def _reset_sequence(
|
||||
self, connection, table: str, sequence: str, empty: bool = False
|
||||
):
|
||||
with connection.cursor() as cursor:
|
||||
if empty:
|
||||
cursor.execute(
|
||||
"SELECT setval(%s, 1, false);",
|
||||
[sequence],
|
||||
)
|
||||
else:
|
||||
cursor.execute(
|
||||
"SELECT setval(%s, COALESCE((SELECT MAX(id) FROM {table}), 1), true);".format(
|
||||
table=self._qn(connection, table)
|
||||
),
|
||||
[sequence],
|
||||
)
|
||||
|
||||
def _drop_table(self, connection, table: str):
|
||||
with connection.cursor() as cursor:
|
||||
cursor.execute(f"DROP TABLE {self._qn(connection, table)};")
|
||||
|
||||
def _create_partition(
|
||||
self, connection, table: str, partition: PartitionBounds
|
||||
) -> bool:
|
||||
partition_name = self._partition_name(table, partition)
|
||||
if self._table_exists(connection, partition_name):
|
||||
return False
|
||||
|
||||
with connection.cursor() as cursor:
|
||||
cursor.execute(
|
||||
f"""
|
||||
CREATE TABLE {self._qn(connection, partition_name)} PARTITION OF {self._qn(connection, table)}
|
||||
FOR VALUES FROM (%s) TO (%s);
|
||||
""",
|
||||
[partition.lower, partition.upper],
|
||||
)
|
||||
return True
|
||||
|
||||
def _drop_partition(self, connection, table: str, partition: PartitionBounds):
|
||||
partition_name = self._partition_name(table, partition)
|
||||
with connection.cursor() as cursor:
|
||||
cursor.execute(
|
||||
f"DROP TABLE IF EXISTS {self._qn(connection, partition_name)};"
|
||||
)
|
||||
|
||||
def _partition_name(self, table: str, partition: PartitionBounds) -> str:
|
||||
schema, base = _split_schema_name(table)
|
||||
name = f"{base}_{partition.name_suffix}"
|
||||
return f"{schema}.{name}" if schema else name
|
||||
|
||||
def _table_exists(self, connection, table: str) -> bool:
|
||||
with connection.cursor() as cursor:
|
||||
cursor.execute(
|
||||
"SELECT to_regclass(%s) IS NOT NULL;",
|
||||
[table],
|
||||
)
|
||||
return cursor.fetchone()[0]
|
||||
|
||||
def _is_partitioned(self, connection, table: str) -> bool:
|
||||
with connection.cursor() as cursor:
|
||||
cursor.execute(
|
||||
"SELECT EXISTS (SELECT 1 FROM pg_partitioned_table WHERE partrelid = %s::regclass);",
|
||||
[table],
|
||||
)
|
||||
return cursor.fetchone()[0]
|
||||
|
||||
def _list_partitions(self, connection, table: str) -> Iterator[PartitionBounds]:
|
||||
with connection.cursor() as cursor:
|
||||
cursor.execute(
|
||||
"""
|
||||
SELECT
|
||||
pg_get_expr(child.relpartbound, child.oid) AS bound
|
||||
FROM pg_inherits
|
||||
JOIN pg_class parent ON parent.oid = pg_inherits.inhparent
|
||||
JOIN pg_class child ON child.oid = pg_inherits.inhrelid
|
||||
WHERE parent.oid = %s::regclass
|
||||
ORDER BY bound;
|
||||
""",
|
||||
[table],
|
||||
)
|
||||
for (bound,) in cursor.fetchall():
|
||||
parsed = _parse_partition_bound(bound)
|
||||
if parsed:
|
||||
yield parsed
|
||||
|
||||
def _indexed_columns(self, connection) -> Iterator[str]:
|
||||
model = LogEntry
|
||||
for field in model._meta.local_fields:
|
||||
if field.primary_key:
|
||||
continue
|
||||
if getattr(field, "db_index", False) or field.unique:
|
||||
yield field.column
|
||||
for index in getattr(model._meta, "indexes", []):
|
||||
with contextlib.suppress(IndexError):
|
||||
yield index.fields[0]
|
||||
|
||||
def _table_rowcount(self, connection, table: str) -> int:
|
||||
with connection.cursor() as cursor:
|
||||
cursor.execute(f"SELECT COUNT(*) FROM {self._qn(connection, table)};")
|
||||
return cursor.fetchone()[0]
|
||||
|
||||
def _table_max_id(self, connection, table: str) -> int:
|
||||
with connection.cursor() as cursor:
|
||||
cursor.execute(
|
||||
f"SELECT COALESCE(MAX(id), 0) FROM {self._qn(connection, table)};"
|
||||
)
|
||||
return cursor.fetchone()[0] or 0
|
||||
|
||||
def _timestamp_bounds(self, connection, table: str) -> PartitionBounds | None:
|
||||
with connection.cursor() as cursor:
|
||||
cursor.execute(
|
||||
f"SELECT MIN(timestamp), MAX(timestamp) FROM {self._qn(connection, table)};"
|
||||
)
|
||||
result = cursor.fetchone()
|
||||
if not result or result[0] is None or result[1] is None:
|
||||
return None
|
||||
min_ts, max_ts = result
|
||||
# Ensure upper bound extends to cover the full month of the max timestamp.
|
||||
max_month = _month_start(max_ts.date())
|
||||
upper = _partition_bounds_for_month(max_month).upper
|
||||
return PartitionBounds(lower=result[0], upper=upper)
|
||||
|
||||
def _sequence_name(self, connection, table: str) -> str | None:
|
||||
with connection.cursor() as cursor:
|
||||
cursor.execute(
|
||||
"SELECT pg_get_serial_sequence(%s, 'id');",
|
||||
[table],
|
||||
)
|
||||
row = cursor.fetchone()
|
||||
if row and row[0]:
|
||||
return row[0]
|
||||
return None
|
||||
|
||||
def _target_sequence_name(self, sequence: str, table: str) -> str:
|
||||
schema, _ = _split_schema_name(sequence)
|
||||
base_table = _base_table_name(table)
|
||||
target = f"{base_table}_id_seq"
|
||||
return f"{schema}.{target}" if schema else target
|
||||
|
||||
def _rename_sequence(self, connection, sequence: str, target: str) -> str:
|
||||
schema, _ = _split_schema_name(sequence)
|
||||
target_schema, target_name = _split_schema_name(target)
|
||||
if schema and target_schema and schema != target_schema:
|
||||
raise CommandError("Cannot move sequence across schemas.")
|
||||
with connection.cursor() as cursor:
|
||||
cursor.execute(
|
||||
"ALTER SEQUENCE {sequence} RENAME TO {name};".format(
|
||||
sequence=self._qn(connection, sequence),
|
||||
name=connection.ops.quote_name(target_name),
|
||||
)
|
||||
)
|
||||
final_schema = schema or target_schema
|
||||
return f"{final_schema}.{target_name}" if final_schema else target_name
|
||||
|
||||
def _collect_partition_months(
|
||||
self,
|
||||
bounds: PartitionBounds | None,
|
||||
ahead: int | None,
|
||||
retention_months: int | None,
|
||||
) -> list[datetime.date]:
|
||||
months: set[datetime.date] = set()
|
||||
today_month = _month_start(datetime.date.today())
|
||||
|
||||
if bounds:
|
||||
min_month = _month_start(bounds.lower.date())
|
||||
max_month = _add_months(_month_start(bounds.upper.date()), -1)
|
||||
end_month = _add_months(max_month, 1)
|
||||
for month in _iter_months(min_month, end_month, inclusive=False):
|
||||
months.add(month)
|
||||
else:
|
||||
months.add(today_month)
|
||||
|
||||
if retention_months and retention_months > 0:
|
||||
retention_start = _add_months(today_month, -retention_months)
|
||||
for month in _iter_months(retention_start, today_month, inclusive=False):
|
||||
months.add(month)
|
||||
|
||||
ahead_value = max(ahead or 0, 0)
|
||||
for offset in range(ahead_value + 1):
|
||||
months.add(_add_months(today_month, offset))
|
||||
|
||||
return sorted(months)
|
||||
|
||||
def _copy_table_to_shadow(
|
||||
self,
|
||||
connection,
|
||||
source_table: str,
|
||||
target_table: str,
|
||||
partitions: Iterable[PartitionBounds],
|
||||
max_id_snapshot: int,
|
||||
):
|
||||
for partition in partitions:
|
||||
params: list[object] = [partition.lower, partition.upper]
|
||||
query = (
|
||||
f"INSERT INTO {self._qn(connection, target_table)} "
|
||||
f"SELECT * FROM {self._qn(connection, source_table)} "
|
||||
f"WHERE timestamp >= %s AND timestamp < %s"
|
||||
)
|
||||
if max_id_snapshot > 0:
|
||||
query += " AND id <= %s"
|
||||
params.append(max_id_snapshot)
|
||||
with connection.cursor() as cursor:
|
||||
cursor.execute(query, params)
|
||||
|
||||
def _sync_delta_rows(
|
||||
self,
|
||||
connection,
|
||||
source_table: str,
|
||||
target_table: str,
|
||||
last_copied_id: int,
|
||||
):
|
||||
with connection.cursor() as cursor:
|
||||
if last_copied_id > 0:
|
||||
cursor.execute(
|
||||
f"INSERT INTO {self._qn(connection, target_table)} "
|
||||
f"SELECT * FROM {self._qn(connection, source_table)} "
|
||||
f"WHERE id > %s;",
|
||||
[last_copied_id],
|
||||
)
|
||||
else:
|
||||
cursor.execute(
|
||||
f"INSERT INTO {self._qn(connection, target_table)} "
|
||||
f"SELECT * FROM {self._qn(connection, source_table)};"
|
||||
)
|
||||
|
||||
def _ensure_table_absent(self, connection, table: str):
|
||||
if self._table_exists(connection, table):
|
||||
raise CommandError(
|
||||
f"Temporary table '{table}' already exists. "
|
||||
"Drop or rename it before running init."
|
||||
)
|
||||
|
||||
def _drop_table_if_exists(self, connection, table: str):
|
||||
if self._table_exists(connection, table):
|
||||
self._drop_table(connection, table)
|
||||
|
||||
def _qn(self, connection, name: str, allow_schema: bool = True) -> str:
|
||||
if allow_schema and "." in name:
|
||||
schema, obj = name.split(".", 1)
|
||||
return (
|
||||
f"{connection.ops.quote_name(schema)}.{connection.ops.quote_name(obj)}"
|
||||
)
|
||||
return connection.ops.quote_name(name)
|
||||
|
||||
|
||||
def _parse_year_month(value: str) -> datetime.date:
|
||||
try:
|
||||
year, month = map(int, value.split("-", 1))
|
||||
return datetime.date(year=year, month=month, day=1)
|
||||
except Exception as exc: # pragma: no cover - arg parsing ensures int
|
||||
raise CommandError(f"Invalid YYYY-MM value '{value}'.") from exc
|
||||
|
||||
|
||||
def _split_schema_name(qualified_name: str):
|
||||
if "." in qualified_name:
|
||||
schema, name = qualified_name.split(".", 1)
|
||||
return schema, name
|
||||
return None, qualified_name
|
||||
|
||||
|
||||
def _base_table_name(table: str) -> str:
|
||||
return table.split(".", 1)[-1]
|
||||
|
||||
|
||||
def _qualified_name(schema: str | None, name: str) -> str:
|
||||
return f"{schema}.{name}" if schema else name
|
||||
|
||||
|
||||
def _coalesce_int(value: int | None, default: int | None) -> int | None:
|
||||
return value if value is not None else default
|
||||
|
||||
|
||||
def _month_start(date_value: datetime.date) -> datetime.date:
|
||||
return date_value.replace(day=1)
|
||||
|
||||
|
||||
def _add_months(date_value: datetime.date, months: int) -> datetime.date:
|
||||
year = date_value.year + (date_value.month - 1 + months) // 12
|
||||
month = (date_value.month - 1 + months) % 12 + 1
|
||||
return datetime.date(year, month, 1)
|
||||
|
||||
|
||||
def _iter_months(
|
||||
start: datetime.date, end: datetime.date, *, inclusive: bool
|
||||
) -> Iterator[datetime.date]:
|
||||
current = start
|
||||
while current < end if not inclusive else current <= end:
|
||||
yield current
|
||||
current = _add_months(current, 1)
|
||||
|
||||
|
||||
def _partition_bounds_for_month(month: datetime.date) -> PartitionBounds:
|
||||
lower = datetime.datetime.combine(month, datetime.time.min).replace(
|
||||
tzinfo=datetime.timezone.utc
|
||||
)
|
||||
upper_month = _add_months(month, 1)
|
||||
upper = datetime.datetime.combine(upper_month, datetime.time.min).replace(
|
||||
tzinfo=datetime.timezone.utc
|
||||
)
|
||||
return PartitionBounds(lower=lower, upper=upper)
|
||||
|
||||
|
||||
def _parse_partition_bound(bound: str) -> PartitionBounds | None:
|
||||
# Example: "FOR VALUES FROM ('2025-11-01 00:00:00+00') TO ('2025-12-01 00:00:00+00')"
|
||||
try:
|
||||
tokens = bound.replace("FOR VALUES FROM (", "").replace(")", "")
|
||||
lower_part, _, upper_part = tokens.partition(" TO ")
|
||||
lower_str = lower_part.strip(" '")
|
||||
upper_str = upper_part.strip(" '")
|
||||
lower_dt = datetime.datetime.fromisoformat(lower_str)
|
||||
upper_dt = datetime.datetime.fromisoformat(upper_str)
|
||||
return PartitionBounds(lower=lower_dt, upper=upper_dt)
|
||||
except Exception:
|
||||
return None
|
||||
160
auditlog_tests/test_partition_command.py
Normal file
160
auditlog_tests/test_partition_command.py
Normal file
|
|
@ -0,0 +1,160 @@
|
|||
"""
|
||||
Tests for the auditlogpartition management command.
|
||||
"""
|
||||
|
||||
from io import StringIO
|
||||
from unittest import mock, skipIf
|
||||
|
||||
import freezegun
|
||||
from django.conf import settings
|
||||
from django.core.management import call_command
|
||||
from django.db import connection
|
||||
from django.db.models import Max
|
||||
from django.test import TransactionTestCase
|
||||
from test_app.models import SimpleModel
|
||||
|
||||
from auditlog.management.commands.auditlogpartition import Command
|
||||
from auditlog.models import LogEntry
|
||||
|
||||
|
||||
def _table_name() -> str:
|
||||
return LogEntry._meta.db_table
|
||||
|
||||
|
||||
@skipIf(settings.TEST_DB_BACKEND != "postgresql", "PostgreSQL-specific test")
|
||||
class AuditlogPartitionCommandTest(TransactionTestCase):
|
||||
databases = "__all__"
|
||||
|
||||
def setUp(self):
|
||||
super().setUp()
|
||||
self._reset_schema()
|
||||
|
||||
def tearDown(self):
|
||||
self._reset_schema()
|
||||
super().tearDown()
|
||||
|
||||
def _reset_schema(self):
|
||||
call_command("migrate", "auditlog", "zero", verbosity=0, database="default")
|
||||
call_command("migrate", "auditlog", verbosity=0, database="default")
|
||||
|
||||
def _call_partition(self, *args: str) -> str:
|
||||
out = StringIO()
|
||||
call_command(
|
||||
"auditlogpartition",
|
||||
*args,
|
||||
stdout=out,
|
||||
stderr=StringIO(),
|
||||
)
|
||||
return out.getvalue()
|
||||
|
||||
def _is_partitioned(self) -> bool:
|
||||
with connection.cursor() as cursor:
|
||||
cursor.execute(
|
||||
"SELECT EXISTS (SELECT 1 FROM pg_partitioned_table WHERE partrelid = %s::regclass);",
|
||||
[_table_name()],
|
||||
)
|
||||
return cursor.fetchone()[0]
|
||||
|
||||
def _partition_suffixes(self) -> set[str]:
|
||||
with connection.cursor() as cursor:
|
||||
cursor.execute(
|
||||
"""
|
||||
SELECT
|
||||
child.relname
|
||||
FROM pg_inherits
|
||||
JOIN pg_class parent ON parent.oid = pg_inherits.inhparent
|
||||
JOIN pg_class child ON child.oid = pg_inherits.inhrelid
|
||||
WHERE parent.oid = %s::regclass
|
||||
ORDER BY child.relname;
|
||||
""",
|
||||
[_table_name()],
|
||||
)
|
||||
names = [row[0] for row in cursor.fetchall()]
|
||||
suffixes = set()
|
||||
for name in names:
|
||||
suffix = name.split("_")[-2:]
|
||||
suffixes.add("_".join(suffix))
|
||||
return suffixes
|
||||
|
||||
def _ensure_partitioned(self):
|
||||
if not self._is_partitioned():
|
||||
with freezegun.freeze_time("2025-11-15"):
|
||||
self._call_partition("init", "--ahead=0")
|
||||
|
||||
def test_init_safe_creates_partitions(self):
|
||||
with freezegun.freeze_time("2025-11-15"):
|
||||
out = self._call_partition("init", "--ahead=1")
|
||||
|
||||
self.assertIn("Partitioning initialized successfully.", out)
|
||||
self.assertTrue(self._is_partitioned())
|
||||
suffixes = self._partition_suffixes()
|
||||
self.assertIn("2025_11", suffixes)
|
||||
self.assertIn("2025_12", suffixes)
|
||||
|
||||
def test_init_convert_moves_rows(self):
|
||||
with freezegun.freeze_time("2025-01-12"):
|
||||
SimpleModel.objects.create(text="first")
|
||||
with freezegun.freeze_time("2025-03-18"):
|
||||
obj = SimpleModel.objects.first()
|
||||
obj.text = "updated"
|
||||
obj.save()
|
||||
|
||||
initial_count = LogEntry.objects.count()
|
||||
original_copy = Command._copy_table_to_shadow
|
||||
|
||||
def patched_copy(self, *args, **kwargs):
|
||||
result = original_copy(self, *args, **kwargs)
|
||||
with freezegun.freeze_time("2025-03-20"):
|
||||
SimpleModel.objects.create(text="late convert")
|
||||
return result
|
||||
|
||||
with (
|
||||
freezegun.freeze_time("2025-04-01"),
|
||||
mock.patch.object(Command, "_copy_table_to_shadow", patched_copy),
|
||||
):
|
||||
self._call_partition("init", "--convert", "--ahead=0")
|
||||
|
||||
self.assertTrue(self._is_partitioned())
|
||||
self.assertEqual(LogEntry.objects.count(), initial_count + 1)
|
||||
|
||||
late_instance = SimpleModel.objects.get(text="late convert")
|
||||
self.assertEqual(late_instance.history.count(), 1)
|
||||
|
||||
suffixes = self._partition_suffixes()
|
||||
self.assertIn("2025_01", suffixes)
|
||||
self.assertIn("2025_03", suffixes)
|
||||
|
||||
max_id = LogEntry.objects.aggregate(max_id=Max("id"))["max_id"]
|
||||
with freezegun.freeze_time("2025-04-05"):
|
||||
SimpleModel.objects.create(text="another")
|
||||
new_max = LogEntry.objects.aggregate(max_id=Max("id"))["max_id"]
|
||||
self.assertGreater(new_max, max_id)
|
||||
self.assertEqual(LogEntry.objects.count(), initial_count + 2)
|
||||
|
||||
def test_create_adds_future_partitions(self):
|
||||
self._ensure_partitioned()
|
||||
with freezegun.freeze_time("2025-11-15"):
|
||||
before = self._partition_suffixes()
|
||||
self._call_partition("create", "--start=2026-01", "--ahead=2")
|
||||
after = self._partition_suffixes()
|
||||
|
||||
self.assertTrue({"2026_01", "2026_02", "2026_03"}.issubset(after))
|
||||
self.assertLessEqual(before, after)
|
||||
|
||||
def test_prune_drops_old_partitions(self):
|
||||
self._ensure_partitioned()
|
||||
with freezegun.freeze_time("2024-01-15"):
|
||||
self._call_partition("create", "--start=2023-10", "--ahead=0")
|
||||
with freezegun.freeze_time("2025-11-15"):
|
||||
suffixes_before = self._partition_suffixes()
|
||||
self.assertIn("2023_10", suffixes_before)
|
||||
self._call_partition("prune", "--retention-months=6")
|
||||
suffixes_after = self._partition_suffixes()
|
||||
|
||||
self.assertNotIn("2023_10", suffixes_after)
|
||||
|
||||
def test_status_outputs_summary(self):
|
||||
self._ensure_partitioned()
|
||||
out = self._call_partition("status")
|
||||
self.assertIn("Partitioned: yes", out)
|
||||
self.assertIn("auditlog_logentry_", out)
|
||||
|
|
@ -223,6 +223,80 @@ You can store a correlation ID (cid) in the log entries by:
|
|||
Using the custom getter is helpful for integrating with a third-party cid package
|
||||
such as `django-cid <https://pypi.org/project/django-cid/>`_.
|
||||
|
||||
PostgreSQL partitioning
|
||||
-----------------------
|
||||
|
||||
``auditlog`` can optionally store ``LogEntry`` rows in native PostgreSQL range
|
||||
partitions. This is useful for installations that accumulate hundreds of
|
||||
millions of audit rows and rely on time-based retention policies.
|
||||
|
||||
Prerequisites
|
||||
~~~~~~~~~~~~~
|
||||
|
||||
* PostgreSQL 11 or newer.
|
||||
* The database user running the management commands must have privileges to
|
||||
``LOCK``, ``ALTER`` and ``CREATE`` tables and sequences.
|
||||
|
||||
Partition lifecycle
|
||||
~~~~~~~~~~~~~~~~~~~
|
||||
|
||||
1. Run the initialization command once per database:
|
||||
|
||||
.. code-block:: bash
|
||||
|
||||
python manage.py auditlogpartition init
|
||||
|
||||
The default mode only works on an empty ``auditlog_logentry`` table. Add
|
||||
``--convert`` to migrate a populated table. The command snapshots the current
|
||||
rows, copies them into a partitioned shadow table while the application
|
||||
continues to write, then briefly locks ``auditlog_logentry`` to copy the
|
||||
remaining delta and swap the tables. Downtime is limited to that final swap.
|
||||
A successful run renames the old table, creates ``auditlog_logentry`` as
|
||||
``PARTITION BY RANGE (timestamp)``, and builds partitions for the current
|
||||
month plus the configured number of future months.
|
||||
|
||||
2. Create additional partitions on demand:
|
||||
|
||||
.. code-block:: bash
|
||||
|
||||
python manage.py auditlogpartition create --ahead=6
|
||||
|
||||
``create`` accepts ``--start`` / ``--end`` (``YYYY-MM``) to materialise a
|
||||
specific range.
|
||||
|
||||
3. Prune old partitions based on your retention window:
|
||||
|
||||
.. code-block:: bash
|
||||
|
||||
python manage.py auditlogpartition prune --retention-months=12
|
||||
|
||||
``prune`` just drops partitions, so it is much faster than row-by-row
|
||||
deletes. Add ``--dry-run`` to see what would be removed.
|
||||
|
||||
4. Inspect partition status at any time:
|
||||
|
||||
.. code-block:: bash
|
||||
|
||||
python manage.py auditlogpartition status
|
||||
|
||||
Operational notes
|
||||
~~~~~~~~~~~~~~~~~
|
||||
|
||||
* ``auditlog`` continues to use the same Django model; all inserts go through
|
||||
the parent table and are automatically routed to the correct partition.
|
||||
* The database sequence is preserved during conversion and reset to the highest
|
||||
``LogEntry.id`` value, so future inserts continue from the correct number.
|
||||
* PostgreSQL cannot enforce a primary key on ``id`` once the table is
|
||||
partitioned because the partition key is ``timestamp``. ``LogEntry`` never had
|
||||
foreign keys or unique constraints in ``auditlog``, so Django's ORM behaviour
|
||||
is unchanged. Document this caveat for your ops team.
|
||||
* Prefer ``auditlogpartition prune`` over ``auditlogflush --before-date`` on
|
||||
very large datasets. Dropping a partition is nearly instantaneous.
|
||||
* For large conversions, consider temporarily increasing ``maintenance_work_mem``
|
||||
and disabling synchronous commits for the session to speed up index creation.
|
||||
* The old table is dropped after a ``--convert`` run completes. Take a backup if
|
||||
you need a longer rollback window.
|
||||
|
||||
Settings
|
||||
--------
|
||||
|
||||
|
|
@ -398,6 +472,33 @@ hides some objects from the majority of ORM queries:
|
|||
is_secret = models.BooleanField(default=False)
|
||||
name = models.CharField(max_length=255)
|
||||
|
||||
**AUDITLOG_PARTITIONED**
|
||||
|
||||
Boolean flag that you can use in your project to signal that the
|
||||
``auditlog_logentry`` table is managed as a PostgreSQL partitioned table. The
|
||||
core package does not enable partitioning automatically—you must run the
|
||||
``auditlogpartition`` command described above. Default: ``False``.
|
||||
|
||||
**AUDITLOG_PARTITION_BY**
|
||||
|
||||
Name of the column used as the partition key. Only ``"timestamp"`` is currently
|
||||
supported. Default: ``"timestamp"``.
|
||||
|
||||
**AUDITLOG_PARTITION_INTERVAL**
|
||||
|
||||
Interval used when creating partitions. The management command currently
|
||||
supports monthly partitions. Default: ``"month"``.
|
||||
|
||||
**AUDITLOG_PARTITION_AHEAD_MONTHS**
|
||||
|
||||
How many future months ``auditlogpartition init`` / ``create`` will pre-create
|
||||
by default. You can override per command via ``--ahead``. Default: ``3``.
|
||||
|
||||
**AUDITLOG_PARTITION_RETENTION_MONTHS**
|
||||
|
||||
Optional retention window in months. ``auditlogpartition prune`` uses this value
|
||||
when ``--retention-months`` is omitted. Default: ``None`` (no automatic pruning).
|
||||
|
||||
objects = SecretManager()
|
||||
|
||||
In this example, when ``AUDITLOG_USE_BASE_MANAGER`` is set to `True`, objects
|
||||
|
|
|
|||
Loading…
Reference in a new issue