index.md

"""TEC-317: Re-run the PartnerInvoicingSettings backfill, scoped per company.

Migration 0004_backfill_default_payment_method_uid ran before 0005 turned the table's primary key from partner_uid (single column) into the composite (partner_uid, company_uid). While the PK was still single-column, step 1 of that migration (bulk_create(..., ignore_conflicts=True)) could only keep ONE settings row per partner, so any debtor invoiced by more than one of our companies ended up with missing rows. Step 2 also filtered its .update() on partner_uid only, so multi-company partners got whichever company's payment method was processed first on ALL their rows.

This script redoes both steps correctly, now that the composite PK exists:

  • Step 1: create every missing (partner_uid, company_uid) pair that was actually invoiced with bank information.
  • Step 2: backfill default_payment_method_uid from the partner's latest invoice, scoped to the bank accounts owned by that company_uid.

WHY IT IS CHUNKED (not an OPEScript): prod has a Postgres transaction_timeout; the OPE harness wraps everything in a single transaction.atomic(), and the original per-row design blew that budget (migration 0004 had to use atomic = False for the same reason). Here every read is bounded — invoices are scanned ONCE via keyset pagination (pk > last LIMIT batch), so no single statement scans the whole table — and every write happens in its own small transaction. No transaction can outlive the timeout.

It reads the invoices/PaymentMethod ORMs directly, exactly as migration 0004 does — this is a one-shot backfill mirroring that migration, not application code.

Run via: dd run_script 2026-05-26-backfill-partner-invoicing-default-payment-method dd run_script 2026-05-26-backfill-partner-invoicing-default-payment-method --commit dd run_script 2026-05-26-backfill-partner-invoicing-default-payment-method --env prod-eu # dry-run, gated dd run_script 2026-05-26-backfill-partner-invoicing-default-payment-method --env prod-eu --commit # persist """

import logging from collections.abc import Iterable, Iterator from datetime import date from itertools import islice

from django.db import connection, transaction from django.utils import timezone

from dashdoc.invoices.models.invoice import Invoice from dashdoc.invoices.models.payment_method import PaymentMethod from dashdoc.tms.financials.partner_invoicing_settings.models.partner_invoicing_settings_orm import ( PartnerInvoicingSettingsORM, )

log = logging.getLogger(name)

TABLE = "financials_partner_invoicing_settings" EXPECTED_PK_COLUMNS = {"partner_uid", "company_uid"}

INVOICE_SCAN_BATCH = 10_000 # rows per keyset page over the invoices table CREATE_BATCH = 1_000 # rows per bulk_create transaction UPDATE_BATCH = 1_000 # rows per backfill transaction

Sentinel for invoices without an invoicing_date so they sort behind dated

ones, matching order_by(invoicing_date.desc(nulls_last=True), "-created").

_DATE_FLOOR = date.min

Pair = tuple[str, str] # (partner_uid, company_uid), both normalised to str

def _uid(value: object) -> str: """Normalise a uid to its canonical string form.

`PartnerInvoicingSettingsORM.partner_uid/company_uid` are `UUIDField`s
(return `uuid.UUID`), while `Company`/`PaymentMethod`/`InvoicingBankInformation`
expose `uid` via `UUIDMixin` as a `CharField` (return `str`). Both are stored
as the same canonical string, so comparing/keying on `str(...)` is the only
way the two sides line up in memory.
"""
return str(value)

def _batched(iterable: Iterable, size: int) -> Iterator[list]: iterator = iter(iterable) while batch := list(islice(iterator, size)): yield batch

def _primary_key_columns() -> set[str]: """Columns of the actual primary key on the table, read from pg catalog.""" with connection.cursor() as cursor: cursor.execute( """ SELECT a.attname FROM pg_index i JOIN pg_attribute a ON a.attrelid = i.indrelid AND a.attnum = ANY(i.indkey) WHERE i.indrelid = %s::regclass AND i.indisprimary """, [TABLE], ) return {row[0] for row in cursor.fetchall()}

def _build_bank_maps() -> tuple[dict[str, str], dict[tuple[str, str], str], dict[str, set[str]]]: """Resolve bank-account UUIDs to their owning company and payment method.

Reads PaymentMethod (a small table) rather than scanning invoices.

Returns:
  company_by_bank:    bank_uid -> company_uid (first payment method wins,
                      matching migration 0004's `.first()`).
  pm_by_bank_company: (bank_uid, company_uid) -> payment_method_uid.
  banks_by_company:   company_uid -> {bank_uid, ...}.
"""
company_by_bank: dict[str, str] = {}
pm_by_bank_company: dict[tuple[str, str], str] = {}
banks_by_company: dict[str, set[str]] = {}

payment_methods = (
    PaymentMethod.objects.filter(bank_account__isnull=False)
    .values("bank_account__uid", "created_by__uid", "uid")
    .iterator()
)
for payment_method in payment_methods:
    bank_uid = payment_method["bank_account__uid"]
    company_uid = _uid(payment_method["created_by__uid"])
    pm_uid = _uid(payment_method["uid"])

    company_by_bank.setdefault(bank_uid, company_uid)
    pm_by_bank_company.setdefault((bank_uid, company_uid), pm_uid)
    banks_by_company.setdefault(company_uid, set()).add(bank_uid)

return company_by_bank, pm_by_bank_company, banks_by_company

def _scan_invoices( company_by_bank: dict[str, str], ) -> tuple[set[Pair], dict[Pair, str]]: """Single keyset-paginated pass over invoices-with-bank.

Returns:
  wanted:      every (partner_uid, company_uid) pair implied by invoicing.
  latest_bank: pair -> bank_uid of that pair's most recent invoice
               (invoicing_date desc, nulls last, then created desc).
"""
wanted: set[Pair] = set()
# pair -> (sort_key, bank_uid); sort_key = (has_date, date, created)
best: dict[Pair, tuple[tuple[bool, date, object], str]] = {}

last_pk = 0
scanned = 0
while True:
    # Paginate by pure pk ranges so every query reads exactly
    # INVOICE_SCAN_BATCH rows via the pk index — bounded regardless of how
    # sparse `bank_information` is. The null/company filtering happens below.
    rows = list(
        Invoice.objects.filter(pk__gt=last_pk)
        .order_by("pk")
        .values_list(
            "pk", "debtor__uid", "bank_information__uid", "invoicing_date", "created"
        )[:INVOICE_SCAN_BATCH]
    )
    if not rows:
        break

    for _pk, debtor_uid, bank_uid, invoicing_date, created in rows:
        if bank_uid is None:
            continue
        company_uid = company_by_bank.get(bank_uid)
        if company_uid is None:
            continue
        pair: Pair = (_uid(debtor_uid), company_uid)
        wanted.add(pair)

        sort_key = (invoicing_date is not None, invoicing_date or _DATE_FLOOR, created)
        current = best.get(pair)
        if current is None or sort_key > current[0]:
            best[pair] = (sort_key, bank_uid)

    last_pk = rows[-1][0]
    scanned += len(rows)
    print(f"  scanned {scanned} invoices (last pk={last_pk})")

latest_bank = {pair: bank_uid for pair, (_, bank_uid) in best.items()}
return wanted, latest_bank

def _create_missing_pairs(wanted: set[Pair], *, dry_run: bool) -> int: existing = { (_uid(partner_uid), _uid(company_uid)) for partner_uid, company_uid in PartnerInvoicingSettingsORM.objects.values_list( "partner_uid", "company_uid" ).iterator() } missing = sorted(wanted - existing) created = 0 now = timezone.now() for chunk in _batched(missing, CREATE_BATCH): with transaction.atomic(): PartnerInvoicingSettingsORM.objects.bulk_create( [ PartnerInvoicingSettingsORM( partner_uid=partner_uid, company_uid=company_uid, created_at=now, updated_at=now, ) for partner_uid, company_uid in chunk ], ignore_conflicts=True, ) if dry_run: transaction.set_rollback(True) created += len(chunk) return created

def _backfill_defaults( latest_bank: dict[Pair, str], pm_by_bank_company: dict[tuple[str, str], str], *, dry_run: bool, ) -> int: # Rows still missing a default. On --commit this runs after step 1 commits, # so it also covers freshly-created rows; on a dry-run those were rolled # back, so the count only reflects pre-existing nulls. null_pairs = [ (_uid(partner_uid), _uid(company_uid)) for partner_uid, company_uid in PartnerInvoicingSettingsORM.objects.filter( default_payment_method_uid__isnull=True ) .values_list("partner_uid", "company_uid") .iterator() ]

targets: list[tuple[Pair, str]] = []
for pair in null_pairs:
    bank_uid = latest_bank.get(pair)
    if bank_uid is None:
        continue
    pm_uid = pm_by_bank_company.get((bank_uid, pair[1]))
    if pm_uid is not None:
        targets.append((pair, pm_uid))

updated = 0
for chunk in _batched(targets, UPDATE_BATCH):
    with transaction.atomic():
        for (partner_uid, company_uid), pm_uid in chunk:
            updated += PartnerInvoicingSettingsORM.objects.filter(
                partner_uid=partner_uid,
                company_uid=company_uid,  # the fix: migration 0004 omitted this
                default_payment_method_uid__isnull=True,
            ).update(default_payment_method_uid=pm_uid, updated_at=timezone.now())
        if dry_run:
            transaction.set_rollback(True)
return updated

def _verify( latest_bank: dict[Pair, str], pm_by_bank_company: dict[tuple[str, str], str], ) -> tuple[int, int]: """Read-only post-check (bounded scan of the settings table).

Returns (resolvable_nulls, total_nulls):
  total_nulls:     rows still missing a default_payment_method_uid.
  resolvable_nulls: subset of those that DO have invoicing history pointing
                    at a known payment method — i.e. rows the backfill should
                    have set. After a committed run this must be 0; anything
                    left is a legitimately-null row (no resolvable PM).
"""
resolvable_nulls = 0
total_nulls = 0
for partner_uid, company_uid in (
    PartnerInvoicingSettingsORM.objects.filter(default_payment_method_uid__isnull=True)
    .values_list("partner_uid", "company_uid")
    .iterator()
):
    total_nulls += 1
    pair: Pair = (_uid(partner_uid), _uid(company_uid))
    bank_uid = latest_bank.get(pair)
    if bank_uid is not None and (bank_uid, pair[1]) in pm_by_bank_company:
        resolvable_nulls += 1
return resolvable_nulls, total_nulls

def main(commit: bool = False) -> None: dry_run = not commit log.info("OPE start", extra={"script": "backfill_partner_invoicing", "dry_run": dry_run})

pk_columns = _primary_key_columns()
assert pk_columns == EXPECTED_PK_COLUMNS, (
    f"{TABLE} primary key is {sorted(pk_columns)}, expected "
    f"{sorted(EXPECTED_PK_COLUMNS)}. Migration 0005 (composite PK) is not "
    "effectively applied on this database — run its ALTER TABLE before backfilling."
)

print("Building bank → company / payment-method maps...")
company_by_bank, pm_by_bank_company, banks_by_company = _build_bank_maps()
print(f"  {len(company_by_bank)} bank accounts across {len(banks_by_company)} companies")

print("Scanning invoices (keyset paginated)...")
wanted, latest_bank = _scan_invoices(company_by_bank)
print(f"  {len(wanted)} (partner, company) pairs implied by invoicing")

print("Step 1: creating missing pairs...")
created = _create_missing_pairs(wanted, dry_run=dry_run)

print("Step 2: backfilling default_payment_method_uid...")
updated = _backfill_defaults(latest_bank, pm_by_bank_company, dry_run=dry_run)

print(
    f"Backfill done ({'dry-run, rolled back' if dry_run else 'COMMITTED'}): "
    f"created={created}, updated={updated}"
)

print("Verifying remaining null defaults...")
resolvable_nulls, total_nulls = _verify(latest_bank, pm_by_bank_company)
expectation = (
    "expected 0 after --commit"
    if not dry_run
    else "reflects pre-write state on a dry-run (writes were rolled back)"
)
print(
    f"  {total_nulls} rows still have no default; {resolvable_nulls} of them have "
    f"resolvable invoicing history ({expectation})"
)
if not dry_run and resolvable_nulls:
    print(
        f"  ⚠️  {resolvable_nulls} resolvable rows were left unset — investigate "
        "before flipping the feature flag."
    )

# NB: `created`/`updated` would collide with reserved LogRecord attributes
# (`LogRecord.created` is the record timestamp), so use distinct keys.
log.info(
    "OPE done",
    extra={
        "dry_run": dry_run,
        "created_count": created,
        "updated_count": updated,
        "resolvable_nulls_remaining": resolvable_nulls,
        "total_nulls_remaining": total_nulls,
    },
)