"""TEC-317: Re-run the PartnerInvoicingSettings backfill, scoped per company.
Migration 0004_backfill_default_payment_method_uid ran before 0005 turned the
table's primary key from partner_uid (single column) into the composite
(partner_uid, company_uid). While the PK was still single-column, step 1 of
that migration (bulk_create(..., ignore_conflicts=True)) could only keep ONE
settings row per partner, so any debtor invoiced by more than one of our
companies ended up with missing rows. Step 2 also filtered its .update() on
partner_uid only, so multi-company partners got whichever company's payment
method was processed first on ALL their rows.
This script redoes both steps correctly, now that the composite PK exists:
- Step 1: create every missing (partner_uid, company_uid) pair that was actually invoiced with bank information.
- Step 2: backfill
default_payment_method_uidfrom the partner's latest invoice, scoped to the bank accounts owned by that company_uid.
WHY IT IS CHUNKED (not an OPEScript):
prod has a Postgres transaction_timeout; the OPE harness wraps everything in a
single transaction.atomic(), and the original per-row design blew that budget
(migration 0004 had to use atomic = False for the same reason). Here every
read is bounded — invoices are scanned ONCE via keyset pagination (pk > last
LIMIT batch), so no single statement scans the whole table — and every write
happens in its own small transaction. No transaction can outlive the timeout.
It reads the invoices/PaymentMethod ORMs directly, exactly as migration 0004 does — this is a one-shot backfill mirroring that migration, not application code.
Run via: dd run_script 2026-05-26-backfill-partner-invoicing-default-payment-method dd run_script 2026-05-26-backfill-partner-invoicing-default-payment-method --commit dd run_script 2026-05-26-backfill-partner-invoicing-default-payment-method --env prod-eu # dry-run, gated dd run_script 2026-05-26-backfill-partner-invoicing-default-payment-method --env prod-eu --commit # persist """
import logging from collections.abc import Iterable, Iterator from datetime import date from itertools import islice
from django.db import connection, transaction from django.utils import timezone
from dashdoc.invoices.models.invoice import Invoice from dashdoc.invoices.models.payment_method import PaymentMethod from dashdoc.tms.financials.partner_invoicing_settings.models.partner_invoicing_settings_orm import ( PartnerInvoicingSettingsORM, )
log = logging.getLogger(name)
TABLE = "financials_partner_invoicing_settings" EXPECTED_PK_COLUMNS = {"partner_uid", "company_uid"}
INVOICE_SCAN_BATCH = 10_000 # rows per keyset page over the invoices table CREATE_BATCH = 1_000 # rows per bulk_create transaction UPDATE_BATCH = 1_000 # rows per backfill transaction
Sentinel for invoices without an invoicing_date so they sort behind dated
ones, matching order_by(invoicing_date.desc(nulls_last=True), "-created").
_DATE_FLOOR = date.min
Pair = tuple[str, str] # (partner_uid, company_uid), both normalised to str
def _uid(value: object) -> str: """Normalise a uid to its canonical string form.
`PartnerInvoicingSettingsORM.partner_uid/company_uid` are `UUIDField`s
(return `uuid.UUID`), while `Company`/`PaymentMethod`/`InvoicingBankInformation`
expose `uid` via `UUIDMixin` as a `CharField` (return `str`). Both are stored
as the same canonical string, so comparing/keying on `str(...)` is the only
way the two sides line up in memory.
"""
return str(value)def _batched(iterable: Iterable, size: int) -> Iterator[list]: iterator = iter(iterable) while batch := list(islice(iterator, size)): yield batch
def _primary_key_columns() -> set[str]: """Columns of the actual primary key on the table, read from pg catalog.""" with connection.cursor() as cursor: cursor.execute( """ SELECT a.attname FROM pg_index i JOIN pg_attribute a ON a.attrelid = i.indrelid AND a.attnum = ANY(i.indkey) WHERE i.indrelid = %s::regclass AND i.indisprimary """, [TABLE], ) return {row[0] for row in cursor.fetchall()}
def _build_bank_maps() -> tuple[dict[str, str], dict[tuple[str, str], str], dict[str, set[str]]]: """Resolve bank-account UUIDs to their owning company and payment method.
Reads PaymentMethod (a small table) rather than scanning invoices.
Returns:
company_by_bank: bank_uid -> company_uid (first payment method wins,
matching migration 0004's `.first()`).
pm_by_bank_company: (bank_uid, company_uid) -> payment_method_uid.
banks_by_company: company_uid -> {bank_uid, ...}.
"""
company_by_bank: dict[str, str] = {}
pm_by_bank_company: dict[tuple[str, str], str] = {}
banks_by_company: dict[str, set[str]] = {}
payment_methods = (
PaymentMethod.objects.filter(bank_account__isnull=False)
.values("bank_account__uid", "created_by__uid", "uid")
.iterator()
)
for payment_method in payment_methods:
bank_uid = payment_method["bank_account__uid"]
company_uid = _uid(payment_method["created_by__uid"])
pm_uid = _uid(payment_method["uid"])
company_by_bank.setdefault(bank_uid, company_uid)
pm_by_bank_company.setdefault((bank_uid, company_uid), pm_uid)
banks_by_company.setdefault(company_uid, set()).add(bank_uid)
return company_by_bank, pm_by_bank_company, banks_by_companydef _scan_invoices( company_by_bank: dict[str, str], ) -> tuple[set[Pair], dict[Pair, str]]: """Single keyset-paginated pass over invoices-with-bank.
Returns:
wanted: every (partner_uid, company_uid) pair implied by invoicing.
latest_bank: pair -> bank_uid of that pair's most recent invoice
(invoicing_date desc, nulls last, then created desc).
"""
wanted: set[Pair] = set()
# pair -> (sort_key, bank_uid); sort_key = (has_date, date, created)
best: dict[Pair, tuple[tuple[bool, date, object], str]] = {}
last_pk = 0
scanned = 0
while True:
# Paginate by pure pk ranges so every query reads exactly
# INVOICE_SCAN_BATCH rows via the pk index — bounded regardless of how
# sparse `bank_information` is. The null/company filtering happens below.
rows = list(
Invoice.objects.filter(pk__gt=last_pk)
.order_by("pk")
.values_list(
"pk", "debtor__uid", "bank_information__uid", "invoicing_date", "created"
)[:INVOICE_SCAN_BATCH]
)
if not rows:
break
for _pk, debtor_uid, bank_uid, invoicing_date, created in rows:
if bank_uid is None:
continue
company_uid = company_by_bank.get(bank_uid)
if company_uid is None:
continue
pair: Pair = (_uid(debtor_uid), company_uid)
wanted.add(pair)
sort_key = (invoicing_date is not None, invoicing_date or _DATE_FLOOR, created)
current = best.get(pair)
if current is None or sort_key > current[0]:
best[pair] = (sort_key, bank_uid)
last_pk = rows[-1][0]
scanned += len(rows)
print(f" scanned {scanned} invoices (last pk={last_pk})")
latest_bank = {pair: bank_uid for pair, (_, bank_uid) in best.items()}
return wanted, latest_bankdef _create_missing_pairs(wanted: set[Pair], *, dry_run: bool) -> int: existing = { (_uid(partner_uid), _uid(company_uid)) for partner_uid, company_uid in PartnerInvoicingSettingsORM.objects.values_list( "partner_uid", "company_uid" ).iterator() } missing = sorted(wanted - existing) created = 0 now = timezone.now() for chunk in _batched(missing, CREATE_BATCH): with transaction.atomic(): PartnerInvoicingSettingsORM.objects.bulk_create( [ PartnerInvoicingSettingsORM( partner_uid=partner_uid, company_uid=company_uid, created_at=now, updated_at=now, ) for partner_uid, company_uid in chunk ], ignore_conflicts=True, ) if dry_run: transaction.set_rollback(True) created += len(chunk) return created
def _backfill_defaults( latest_bank: dict[Pair, str], pm_by_bank_company: dict[tuple[str, str], str], *, dry_run: bool, ) -> int: # Rows still missing a default. On --commit this runs after step 1 commits, # so it also covers freshly-created rows; on a dry-run those were rolled # back, so the count only reflects pre-existing nulls. null_pairs = [ (_uid(partner_uid), _uid(company_uid)) for partner_uid, company_uid in PartnerInvoicingSettingsORM.objects.filter( default_payment_method_uid__isnull=True ) .values_list("partner_uid", "company_uid") .iterator() ]
targets: list[tuple[Pair, str]] = []
for pair in null_pairs:
bank_uid = latest_bank.get(pair)
if bank_uid is None:
continue
pm_uid = pm_by_bank_company.get((bank_uid, pair[1]))
if pm_uid is not None:
targets.append((pair, pm_uid))
updated = 0
for chunk in _batched(targets, UPDATE_BATCH):
with transaction.atomic():
for (partner_uid, company_uid), pm_uid in chunk:
updated += PartnerInvoicingSettingsORM.objects.filter(
partner_uid=partner_uid,
company_uid=company_uid, # the fix: migration 0004 omitted this
default_payment_method_uid__isnull=True,
).update(default_payment_method_uid=pm_uid, updated_at=timezone.now())
if dry_run:
transaction.set_rollback(True)
return updateddef _verify( latest_bank: dict[Pair, str], pm_by_bank_company: dict[tuple[str, str], str], ) -> tuple[int, int]: """Read-only post-check (bounded scan of the settings table).
Returns (resolvable_nulls, total_nulls):
total_nulls: rows still missing a default_payment_method_uid.
resolvable_nulls: subset of those that DO have invoicing history pointing
at a known payment method — i.e. rows the backfill should
have set. After a committed run this must be 0; anything
left is a legitimately-null row (no resolvable PM).
"""
resolvable_nulls = 0
total_nulls = 0
for partner_uid, company_uid in (
PartnerInvoicingSettingsORM.objects.filter(default_payment_method_uid__isnull=True)
.values_list("partner_uid", "company_uid")
.iterator()
):
total_nulls += 1
pair: Pair = (_uid(partner_uid), _uid(company_uid))
bank_uid = latest_bank.get(pair)
if bank_uid is not None and (bank_uid, pair[1]) in pm_by_bank_company:
resolvable_nulls += 1
return resolvable_nulls, total_nullsdef main(commit: bool = False) -> None: dry_run = not commit log.info("OPE start", extra={"script": "backfill_partner_invoicing", "dry_run": dry_run})
pk_columns = _primary_key_columns()
assert pk_columns == EXPECTED_PK_COLUMNS, (
f"{TABLE} primary key is {sorted(pk_columns)}, expected "
f"{sorted(EXPECTED_PK_COLUMNS)}. Migration 0005 (composite PK) is not "
"effectively applied on this database — run its ALTER TABLE before backfilling."
)
print("Building bank → company / payment-method maps...")
company_by_bank, pm_by_bank_company, banks_by_company = _build_bank_maps()
print(f" {len(company_by_bank)} bank accounts across {len(banks_by_company)} companies")
print("Scanning invoices (keyset paginated)...")
wanted, latest_bank = _scan_invoices(company_by_bank)
print(f" {len(wanted)} (partner, company) pairs implied by invoicing")
print("Step 1: creating missing pairs...")
created = _create_missing_pairs(wanted, dry_run=dry_run)
print("Step 2: backfilling default_payment_method_uid...")
updated = _backfill_defaults(latest_bank, pm_by_bank_company, dry_run=dry_run)
print(
f"Backfill done ({'dry-run, rolled back' if dry_run else 'COMMITTED'}): "
f"created={created}, updated={updated}"
)
print("Verifying remaining null defaults...")
resolvable_nulls, total_nulls = _verify(latest_bank, pm_by_bank_company)
expectation = (
"expected 0 after --commit"
if not dry_run
else "reflects pre-write state on a dry-run (writes were rolled back)"
)
print(
f" {total_nulls} rows still have no default; {resolvable_nulls} of them have "
f"resolvable invoicing history ({expectation})"
)
if not dry_run and resolvable_nulls:
print(
f" ⚠️ {resolvable_nulls} resolvable rows were left unset — investigate "
"before flipping the feature flag."
)
# NB: `created`/`updated` would collide with reserved LogRecord attributes
# (`LogRecord.created` is the record timestamp), so use distinct keys.
log.info(
"OPE done",
extra={
"dry_run": dry_run,
"created_count": created,
"updated_count": updated,
"resolvable_nulls_remaining": resolvable_nulls,
"total_nulls_remaining": total_nulls,
},
)