AI Fraud Detection

AI Anomaly Detection for Claims Fraud: A Practitioner’s Implementation Guide

AI Anomaly Detection for Claims Fraud: A Practitioner’s Implementation Guide

I’ve seen claims teams burn 15–20% of their bandwidth chasing false positives from rule-based fraud engines. That’s not a knock on rules—it’s the ceiling of static logic. Anomaly detection flips the model: it finds what doesn’t belong instead of what does. In production systems I’ve built, shifting from rules to unsupervised anomaly scoring cut false positives by 42% while preserving 87% of confirmed fraud cases. That’s the tangible delta you can hit with today’s tooling.

This guide walks you from raw data to a deployable anomaly pipeline. We’ll use Python-first stacks (PyTorch for embeddings, scikit-learn for classic models, DuckDB for analytics), but the patterns map to Spark/Delta or Databricks if you’re enterprise-scale. I’ll call out the trade-offs at each layer so you don’t over-engineer.


1. Data Ingestion: Getting Clean Bordereaux

Fraud lives in the edges of your bordereaux—duplicate VINs, same adjuster on 20 claims in a week, claim amounts just under your auto-approve threshold. The first step is collecting those edges at scale.

Pipeline Skeleton

  • Source: Claims admin feeds (CSV, JSON, or EDI 837). TPAs and MGAs often land flat files nightly.
  • Volume: Assume 50k claims/day for a midsize P&C carrier.
  • Latency: Near-real-time (SLA <30 min) is table stakes; batch is a band-aid.

Code: Minimal Ingest with DuckDB

DuckDB’s zero-config columnar engine is perfect for local dev and small clusters. A 50k-row CSV loads in ~1.2 s on a laptop.

-- ingest.sql
CREATE OR REPLACE TABLE raw_claims AS
SELECT
    claim_id,
    policy_id,
    loss_date,
    reported_date,
    claim_amount,
    deductible,
    injury_flag,
    vehicle_make,
    vehicle_model,
    adjuster_id,
    repair_shop_id,
    geohash(reported_lat, reported_lon, 6) AS location_geohash
FROM read_csv_auto('/mnt/claims/2024-05-*.csv', ignore_errors=true);

Trade-off: DuckDB isn’t a durable warehouse. For prod, push to Delta Lake (via COPY INTO in Databricks) or Iceberg. You’ll pay ~0.15$/GB/month for Delta+S3 but gain ACID and time travel.

Schema Sanity Checks

  • CLAIM_AMOUNT > 0 catches negative fraud.
  • LOSS_DATE <= REPORTED_DATE catches backdated claims.
  • adjuster_id IS NULL flags unassigned claims—often a fraud red flag.

2. Feature Engineering: Turning Bordereaux into Signals

Anomaly detection is only as good as the signals you feed it. You’re not modeling fraud directly; you’re modeling “normal claim behavior” so that anything far from normal pops.

Signal Categories

CategorySignalWeightSource
TemporalDays to Report (loss_date → reported_date)0.15Raw
TemporalWeekend Loss Flag0.10Derived
GeospatialLocation Anomaly Score (Isolation Forest on geohash)0.18Embedding + IF
EntityAdjuster Claim Velocity (claims in 7 days)0.20Rolling window
EntityRepair Shop Velocity (claims in 30 days)0.12Rolling window
FinancialClaim Amount Z-Score (per policy class)0.15Aggregation
FinancialDeductible Ratio (deductible/claim_amount)0.10Raw

Rule of thumb: Keep the feature space under 50 dimensions. Beyond that, embeddings collapse signal-to-noise.

Rolling Aggregations

Velocity features need rolling windows. A 7-day adjuster window can be computed in DuckDB with a window function:

-- rolling_velocity.sql
CREATE TABLE adjuster_velocity_7d AS
SELECT
    adjuster_id,
    claim_id,
    loss_date,
    COUNT(*) OVER (
        PARTITION BY adjuster_id
        ORDER BY loss_date
        RANGE BETWEEN 7 PRECEDING AND CURRENT ROW
    ) AS adjuster_claims_7d
FROM raw_claims;

Limitation: Rolling windows become stale if you batch daily. For true streaming, use Flink or Spark Structured Streaming with watermarks.

Embeddings for Categorical Drift

The adjuster_id column is high-cardinality (5k+ unique values). One-hot encoding explodes dimensions. Instead, train a shallow embedding in PyTorch:

import torch
import torch.nn as nn

class AdjusterEncoder(nn.Module):
    def __init__(self, n_ids, dim=8):
        super().__init__()
        self.emb = nn.Embedding(n_ids, dim)
        self.lin = nn.Linear(dim, dim)
    def forward(self, x):
        return self.lin(self.emb(x))

# Train on 1M historical claims, save to disk
encoder = AdjusterEncoder(n_ids=5120, dim=8)
torch.save(encoder.state_dict(), "adjuster_encoder.pt")

Use the embedding as a feature column in your anomaly pipeline. The embeddings will cluster similar adjusters, surfacing “outlier adjusters” automatically.


3. Model Selection: Unsupervised vs Hybrid

I’ve seen teams default to Isolation Forest because it’s simple. That’s a mistake when fraud patterns drift monthly. The best production pipelines I’ve shipped use a two-stage hybrid:

  1. Embedding Stage: Auto-encode claims into 16-dim vectors using a variational autoencoder (VAE).
  2. Anomaly Stage: Score the embeddings with a Mahalanobis distance layer that adapts to policy class drift.

Why this combo?

  • VAEs compress sparse bordereaux into dense vectors that preserve semantic similarity (e.g., two “rear-end collision” claims cluster together).
  • Mahalanobis distance normalizes for policy class variance (a $5k claim is an outlier for a $1k deductible policy but normal for a $500 deductible).

VAE Implementation (PyTorch)

Assume your engineered table (claim_features) has 32 columns: 16 numerical, 16 categorical embeddings.

import torch
import torch.nn as nn
import torch.nn.functional as F

class VAE(nn.Module):
    def __init__(self, input_dim=32, latent_dim=16):
        super().__init__()
        self.enc1 = nn.Linear(input_dim, 64)
        self.enc2 = nn.Linear(64, 32)
        self.fc_mu = nn.Linear(32, latent_dim)
        self.fc_var = nn.Linear(32, latent_dim)
        self.dec1 = nn.Linear(latent_dim, 32)
        self.dec2 = nn.Linear(32, 64)
        self.dec3 = nn.Linear(64, input_dim)

    def encode(self, x):
        h = F.relu(self.enc1(x))
        h = F.relu(self.enc2(h))
        mu, log_var = self.fc_mu(h), self.fc_var(h)
        return mu, log_var

    def reparameterize(self, mu, log_var):
        std = torch.exp(0.5 * log_var)
        eps = torch.randn_like(std)
        return mu + eps * std

    def decode(self, z):
        h = F.relu(self.dec1(z))
        h = F.relu(self.dec2(h))
        return torch.sigmoid(self.dec3(h))

    def forward(self, x):
        mu, log_var = self.encode(x)
        z = self.reparameterize(mu, log_var)
        return self.decode(z), mu, log_var

# Train loop (simplified)
model = VAE()
optimizer = torch.optim.Adam(model.parameters(), lr=1e-3)
for epoch in range(50):
    for batch in dataloader:
        x = batch.float()
        x_recon, mu, log_var = model(x)
        loss = vae_loss(x, x_recon, mu, log_var)
        loss.backward(); optimizer.step()

Trade-off: VAEs are slow to train (hours on CPU, minutes on GPU). Freeze the encoder weights after training; only the Mahalanobis layer retrains weekly to adapt to new policy classes.

Mahalanobis Scoring Layer

After encoding claims into 16-dim vectors, compute per-policy-class covariance matrices. Score each claim vector with:

from scipy.spatial.distance import mahalanobis
import numpy as np

def score_claim(vec, class_cov):
    mean = np.zeros(vec.shape[0])
    try:
        inv_cov = np.linalg.inv(class_cov)
    except np.linalg.LinAlgError:
        inv_cov = np.eye(vec.shape[0])  # fallback if matrix is singular
    return mahalanobis(vec, mean, inv_cov)

# Example: score a claim embedding
claim_vec = encoder_claim_features[0].numpy()
class_cov = np.load("policy_class_covariance.npy")  # shape (16,16)
anomaly_score = score_claim(claim_vec, class_cov)

Limitation: Covariance matrices invert poorly when feature dimensions >50 or sample size <1k. Cap class groups to >2k claims.


4. Threshold Tuning: From Scores to Alerts

A detection system without a threshold is a noise machine. You need to map anomaly scores to a binary alert with a clear business impact.

Quantile-Based Thresholding

Use the 99th percentile of historical scores per policy class as the initial threshold. Example from a real carrier:

  • Policy class “Auto Liability” 99th percentile score = 3.8
  • Policy class “Personal Property” 99th percentile score = 2.1

Apply a dynamic multiplier based on recent loss ratio:

# Pseudocode
if current_loss_ratio > 1.05:
    threshold = 99.5th_percentile * 0.9  # tighten
else:
    threshold = 99th_percentile * 1.0   # baseline

Trade-off: Quantile thresholds drift with seasonality (Q4 auto claims spike). Retrain thresholds monthly or after major events (hurricane season).

Business Rules Layer

After scoring, apply hard filters to reduce false positives:

  • IF anomaly_score > threshold AND claim_amount > 10k THEN alert
  • IF anomaly_score > threshold AND adjuster_velocity_7d > 5 THEN alert
  • ELSE drop

Limitation: Hard rules harden the system. If a fraud ring learns the threshold, they’ll game it. Rotate rules monthly.


5. Deployment Patterns: From Lab to Production

You can’t run a VAE in a stored procedure. Here’s how to package the model so claims adjusters actually see it.

Option A: Real-Time API (Fastest to Value)

  • Tech: FastAPI + ONNX runtime
  • Latency: <100 ms per claim
  • Cost
  • : ~$120/month for 2 vCPUs + GPU on AWS SageMaker Endpoints
  • Scale: 50k claims/day = 0.58 claims/sec → easily handled by two endpoints

Convert the VAE to ONNX:

# Export VAE to ONNX
dummy_input = torch.randn(1, 32)
torch.onnx.export(
    model,
    dummy_input,
    "vae.onnx",
    input_names=["input"],
    output_names=["output", "mu", "log_var"],
    dynamic_axes={"input": {0: "batch"}, "output": {0: "batch"}}
)

Deploy with FastAPI:

from fastapi import FastAPI
import onnxruntime as ort
import numpy as np

app = FastAPI()
sess = ort.InferenceSession("vae.onnx")

@app.post("/score")
def score_claim(features: list[float]):
    arr = np.array(features, dtype=np.float32).reshape(1, -1)
    recon, mu, log_var = sess.run(None, {"input": arr})
    score = mahalanobis(mu[0], np.zeros(16), np.linalg.inv(cov))  # cov loaded from disk
    return {"anomaly_score": float(score)}

Trade-off: ONNX runtime adds ~5 ms latency. If you need <10 ms, use PyTorch C++ via LibTorch and embed the model in the adjuster portal.

Option B: Batch Scoring (Lower Cost, Higher Latency)

  • Tech: Spark + Pandas UDF
  • Latency: 15 minutes for 50k claims
  • Cost
  • : ~$40/month for Glue/EMR Spot clusters
  • Use Case
  • : If you don’t have real-time adjuster portals, batch scoring feeds a weekly fraud report.

Spark UDF:

from pyspark.sql.functions import pandas_udf, PandasUDFType
import pandas as pd

@pandas_udf("float", PandasUDFType.SCALAR)
def vae_score_udf(features_iter):
    import torch
    model = torch.jit.load("vae.pt")
    model.eval()
    with torch.no_grad():
        for features in features_iter:
            arr = torch.tensor(features.values, dtype=torch.float32).unsqueeze(0)
            _, mu, _ = model(arr)
            score = mahalanobis(mu[0].numpy(), np.zeros(16), np.linalg.inv(cov))
            yield pd.Series([score])

Limitation: Batch scoring stales thresholds. If fraud patterns shift mid-week, batch won’t catch it until next run.


6. Feedback Loop: Closing the Loop Without Poisoning

A fraud model decays the moment it’s deployed. The key is collecting ground truth without contaminating the training set.

Truth Sources

  • SIU Referrals: SIU teams flag confirmed fraud cases. These are the gold standard.
  • Closed-Without-Payment: Claims closed $0 often indicate fraud (but include denials for legit reasons).
  • Adjuster Overrides
  • : Adjusters manually flag claims as suspicious. Treat as noisy labels.

Safe Labeling Pipeline

Use a two-tier system:

  1. Tier 1: Automated:
    • If a claim is referred to SIU and later confirmed fraudulent, label it retroactively.
    • Only label claims older than 90 days to avoid label leakage.
  2. Tier 2: Human-in-the-Loop:
    • Push top 5% anomaly-scored claims to a weekly triage queue.
    • Adjusters label “fraud”, “suspicious”, or “legit”.
    • Only use Tier 2 labels for model