diff --git a/engine/configs/rds-refresh.example.yaml b/engine/configs/rds-refresh.example.yaml new file mode 100644 index 00000000..300d5ab8 --- /dev/null +++ b/engine/configs/rds-refresh.example.yaml @@ -0,0 +1,95 @@ +# Example configuration for rds-refresh component +# +# This component automates DBLab full refresh using temporary RDS/Aurora clones. +# Copy this file and customize for your environment. +# +# For Lambda deployment, see deploy/rds-refresh/template.yaml +# For CLI usage: rds-refresh -config rds-refresh.yaml + +# Source database configuration +source: + # Type of source database: + # - "rds" for RDS DB instance + # - "aurora-cluster" for Aurora cluster + type: rds + + # RDS DB instance identifier or Aurora cluster identifier + identifier: production-db + + # Optional: Specific snapshot identifier to use + # If empty, the latest automated snapshot will be used + # snapshotIdentifier: rds:production-db-2024-01-15-02-00 + +# Temporary clone configuration +clone: + # Instance class for the clone (can be smaller than production) + instanceClass: db.t3.medium + + # DB subnet group (must be in a VPC accessible from DBLab Engine) + subnetGroup: default-vpc-subnet + + # VPC security groups for the clone + # Must allow inbound connections from DBLab Engine on PostgreSQL port + securityGroups: + - sg-12345678 + - sg-87654321 + + # Whether the clone should be publicly accessible + # Set to false if DBLab is in the same VPC + publiclyAccessible: false + + # Enable IAM database authentication (recommended) + enableIAMAuth: true + + # Optional: DB parameter group name + # parameterGroup: custom-postgres-params + + # Optional: DB option group name (RDS only) + # optionGroup: custom-options + + # Optional: Cluster parameter group (Aurora only) + # clusterParameterGroup: aurora-postgres-params + + # Optional: Engine version override + # engineVersion: "15.4" + + # Optional: Custom port (default: 5432) + # port: 5432 + + # Optional: Storage type (gp2, gp3, io1, io2) + # storageType: gp3 + + # Deletion protection (should be false for temporary clones) + deletionProtection: false + + # Additional tags for the clone + tags: + Environment: dblab-refresh + Team: platform + CostCenter: engineering + +# DBLab Engine configuration +dblab: + # DBLab Engine API endpoint + apiEndpoint: https://dblab.example.com:2345 + + # Verification token for DBLab API + # Use environment variable expansion for security + token: ${DBLAB_TOKEN} + + # Skip TLS certificate verification (not recommended for production) + insecure: false + + # How often to poll DBLab status during refresh + pollInterval: 30s + + # Maximum time to wait for refresh to complete + timeout: 4h + +# AWS configuration +aws: + # AWS region where RDS/Aurora resources are located + region: us-east-1 + + # Optional: Custom AWS endpoint (for testing with LocalStack) + # endpoint: http://localhost:4566 diff --git a/engine/go.mod b/engine/go.mod index 81d0e24c..cf8d3ada 100644 --- a/engine/go.mod +++ b/engine/go.mod @@ -42,6 +42,22 @@ require ( dario.cat/mergo v1.0.2 // indirect github.com/Azure/go-ansiterm v0.0.0-20230124172434-306776ec8161 // indirect github.com/Microsoft/go-winio v0.6.2 // indirect + github.com/aws/aws-lambda-go v1.51.0 // indirect + github.com/aws/aws-sdk-go-v2 v1.41.0 // indirect + github.com/aws/aws-sdk-go-v2/config v1.32.5 // indirect + github.com/aws/aws-sdk-go-v2/credentials v1.19.5 // indirect + github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.18.16 // indirect + github.com/aws/aws-sdk-go-v2/internal/configsources v1.4.16 // indirect + github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.7.16 // indirect + github.com/aws/aws-sdk-go-v2/internal/ini v1.8.4 // indirect + github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.13.4 // indirect + github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.13.16 // indirect + github.com/aws/aws-sdk-go-v2/service/rds v1.113.1 // indirect + github.com/aws/aws-sdk-go-v2/service/signin v1.0.4 // indirect + github.com/aws/aws-sdk-go-v2/service/sso v1.30.7 // indirect + github.com/aws/aws-sdk-go-v2/service/ssooidc v1.35.12 // indirect + github.com/aws/aws-sdk-go-v2/service/sts v1.41.5 // indirect + github.com/aws/smithy-go v1.24.0 // indirect github.com/cenkalti/backoff/v4 v4.3.0 // indirect github.com/containerd/errdefs v1.0.0 // indirect github.com/containerd/errdefs/pkg v0.3.0 // indirect diff --git a/engine/go.sum b/engine/go.sum index 16595c52..8d6ff5a5 100644 --- a/engine/go.sum +++ b/engine/go.sum @@ -14,8 +14,40 @@ github.com/ahmetalpbalkan/dlog v0.0.0-20170105205344-4fb5f8204f26 h1:pzStYMLAXM7 github.com/ahmetalpbalkan/dlog v0.0.0-20170105205344-4fb5f8204f26/go.mod h1:ilK+u7u1HoqaDk0mjhh27QJB7PyWMreGffEvOCoEKiY= github.com/araddon/dateparse v0.0.0-20210429162001-6b43995a97de h1:FxWPpzIjnTlhPwqqXc4/vE0f7GvRjuAsbW+HOIe8KnA= github.com/araddon/dateparse v0.0.0-20210429162001-6b43995a97de/go.mod h1:DCaWoUhZrYW9p1lxo/cm8EmUOOzAPSEZNGF2DK1dJgw= +github.com/aws/aws-lambda-go v1.51.0 h1:/THH60NjiAs3K5TWet3Gx5w8MdR7oPOQH9utaKYY1JQ= +github.com/aws/aws-lambda-go v1.51.0/go.mod h1:dpMpZgvWx5vuQJfBt0zqBha60q7Dd7RfgJv23DymV8A= github.com/aws/aws-sdk-go v1.44.309 h1:IPJOFBzXekakxmEpDwd4RTKmmBR6LIAiXgNsM51bWbU= github.com/aws/aws-sdk-go v1.44.309/go.mod h1:aVsgQcEevwlmQ7qHE9I3h+dtQgpqhFB+i8Phjh7fkwI= +github.com/aws/aws-sdk-go-v2 v1.41.0 h1:tNvqh1s+v0vFYdA1xq0aOJH+Y5cRyZ5upu6roPgPKd4= +github.com/aws/aws-sdk-go-v2 v1.41.0/go.mod h1:MayyLB8y+buD9hZqkCW3kX1AKq07Y5pXxtgB+rRFhz0= +github.com/aws/aws-sdk-go-v2/config v1.32.5 h1:pz3duhAfUgnxbtVhIK39PGF/AHYyrzGEyRD9Og0QrE8= +github.com/aws/aws-sdk-go-v2/config v1.32.5/go.mod h1:xmDjzSUs/d0BB7ClzYPAZMmgQdrodNjPPhd6bGASwoE= +github.com/aws/aws-sdk-go-v2/credentials v1.19.5 h1:xMo63RlqP3ZZydpJDMBsH9uJ10hgHYfQFIk1cHDXrR4= +github.com/aws/aws-sdk-go-v2/credentials v1.19.5/go.mod h1:hhbH6oRcou+LpXfA/0vPElh/e0M3aFeOblE1sssAAEk= +github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.18.16 h1:80+uETIWS1BqjnN9uJ0dBUaETh+P1XwFy5vwHwK5r9k= +github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.18.16/go.mod h1:wOOsYuxYuB/7FlnVtzeBYRcjSRtQpAW0hCP7tIULMwo= +github.com/aws/aws-sdk-go-v2/internal/configsources v1.4.16 h1:rgGwPzb82iBYSvHMHXc8h9mRoOUBZIGFgKb9qniaZZc= +github.com/aws/aws-sdk-go-v2/internal/configsources v1.4.16/go.mod h1:L/UxsGeKpGoIj6DxfhOWHWQ/kGKcd4I1VncE4++IyKA= +github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.7.16 h1:1jtGzuV7c82xnqOVfx2F0xmJcOw5374L7N6juGW6x6U= +github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.7.16/go.mod h1:M2E5OQf+XLe+SZGmmpaI2yy+J326aFf6/+54PoxSANc= +github.com/aws/aws-sdk-go-v2/internal/ini v1.8.4 h1:WKuaxf++XKWlHWu9ECbMlha8WOEGm0OUEZqm4K/Gcfk= +github.com/aws/aws-sdk-go-v2/internal/ini v1.8.4/go.mod h1:ZWy7j6v1vWGmPReu0iSGvRiise4YI5SkR3OHKTZ6Wuc= +github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.13.4 h1:0ryTNEdJbzUCEWkVXEXoqlXV72J5keC1GvILMOuD00E= +github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.13.4/go.mod h1:HQ4qwNZh32C3CBeO6iJLQlgtMzqeG17ziAA/3KDJFow= +github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.13.16 h1:oHjJHeUy0ImIV0bsrX0X91GkV5nJAyv1l1CC9lnO0TI= +github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.13.16/go.mod h1:iRSNGgOYmiYwSCXxXaKb9HfOEj40+oTKn8pTxMlYkRM= +github.com/aws/aws-sdk-go-v2/service/rds v1.113.1 h1:/vV0g/Su8rCTqT57UUYiFU/aRrPXz//fGDn1dkXblG4= +github.com/aws/aws-sdk-go-v2/service/rds v1.113.1/go.mod h1:q02df+DL73LN+jDXzj86tMsI6kKf1kfv61nB684H+o8= +github.com/aws/aws-sdk-go-v2/service/signin v1.0.4 h1:HpI7aMmJ+mm1wkSHIA2t5EaFFv5EFYXePW30p1EIrbQ= +github.com/aws/aws-sdk-go-v2/service/signin v1.0.4/go.mod h1:C5RdGMYGlfM0gYq/tifqgn4EbyX99V15P2V3R+VHbQU= +github.com/aws/aws-sdk-go-v2/service/sso v1.30.7 h1:eYnlt6QxnFINKzwxP5/Ucs1vkG7VT3Iezmvfgc2waUw= +github.com/aws/aws-sdk-go-v2/service/sso v1.30.7/go.mod h1:+fWt2UHSb4kS7Pu8y+BMBvJF0EWx+4H0hzNwtDNRTrg= +github.com/aws/aws-sdk-go-v2/service/ssooidc v1.35.12 h1:AHDr0DaHIAo8c9t1emrzAlVDFp+iMMKnPdYy6XO4MCE= +github.com/aws/aws-sdk-go-v2/service/ssooidc v1.35.12/go.mod h1:GQ73XawFFiWxyWXMHWfhiomvP3tXtdNar/fi8z18sx0= +github.com/aws/aws-sdk-go-v2/service/sts v1.41.5 h1:SciGFVNZ4mHdm7gpD1dgZYnCuVdX1s+lFTg4+4DOy70= +github.com/aws/aws-sdk-go-v2/service/sts v1.41.5/go.mod h1:iW40X4QBmUxdP+fZNOpfmkdMZqsovezbAeO+Ubiv2pk= +github.com/aws/smithy-go v1.24.0 h1:LpilSUItNPFr1eY85RYgTIg5eIEPtvFbskaFcmmIUnk= +github.com/aws/smithy-go v1.24.0/go.mod h1:LEj2LM3rBRQJxPZTB4KuzZkaZYnZPnvgIhb4pu07mx0= github.com/cenkalti/backoff/v4 v4.3.0 h1:MyRJ/UdXutAwSAT+s3wNd7MfTIcy71VQueUuFK343L8= github.com/cenkalti/backoff/v4 v4.3.0/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE= github.com/cockroachdb/apd v1.1.0 h1:3LFP3629v+1aKXU5Q37mxmRxX/pIu1nijXydLShEq5I= diff --git a/rds-refresh/Dockerfile b/rds-refresh/Dockerfile new file mode 100644 index 00000000..8863d1b6 --- /dev/null +++ b/rds-refresh/Dockerfile @@ -0,0 +1,39 @@ +# Build stage +FROM golang:1.21-alpine AS builder + +RUN apk add --no-cache git ca-certificates + +WORKDIR /build + +# Copy go mod files first for better caching +COPY go.mod go.sum ./ +RUN go mod download + +# Copy source code +COPY *.go ./ + +# Build the binary +ARG VERSION=dev +ARG BUILD_TIME=unknown + +RUN CGO_ENABLED=0 GOOS=linux go build \ + -ldflags="-s -w -X main.version=${VERSION} -X main.buildTime=${BUILD_TIME}" \ + -o /rds-refresh \ + . + +# Runtime stage +FROM alpine:3.19 + +RUN apk add --no-cache ca-certificates tzdata + +# Create non-root user +RUN adduser -D -u 1000 appuser + +WORKDIR /app + +COPY --from=builder /rds-refresh /usr/local/bin/rds-refresh + +USER appuser + +ENTRYPOINT ["/usr/local/bin/rds-refresh"] +CMD ["--help"] diff --git a/rds-refresh/Makefile b/rds-refresh/Makefile new file mode 100644 index 00000000..b41afb23 --- /dev/null +++ b/rds-refresh/Makefile @@ -0,0 +1,53 @@ +.PHONY: build build-linux clean test fmt vet docker-build docker-push + +VERSION ?= $(shell git describe --tags --always --dirty 2>/dev/null || echo "dev") +BUILD_TIME ?= $(shell date -u +"%Y-%m-%dT%H:%M:%SZ") +LDFLAGS = -ldflags "-s -w -X main.version=$(VERSION) -X main.buildTime=$(BUILD_TIME)" +DOCKER_IMAGE ?= postgresai/rds-refresh + +# Build for current platform +build: + go build $(LDFLAGS) -o rds-refresh . + +# Build for Linux (for Docker containers) +build-linux: + GOOS=linux GOARCH=amd64 go build $(LDFLAGS) -o rds-refresh-linux-amd64 . + GOOS=linux GOARCH=arm64 go build $(LDFLAGS) -o rds-refresh-linux-arm64 . + +# Clean build artifacts +clean: + rm -f rds-refresh rds-refresh-linux-* + +# Run tests +test: + go test -v ./... + +# Format code +fmt: + go fmt ./... + +# Run go vet +vet: + go vet ./... + +# Download dependencies +deps: + go mod download + go mod tidy + +# Run locally (requires config.yaml) +run: + go run . -config config.yaml + +# Run dry-run locally +dry-run: + go run . -config config.yaml -dry-run + +# Build Docker image +docker-build: + docker build -t $(DOCKER_IMAGE):$(VERSION) -t $(DOCKER_IMAGE):latest . + +# Push Docker image +docker-push: + docker push $(DOCKER_IMAGE):$(VERSION) + docker push $(DOCKER_IMAGE):latest diff --git a/rds-refresh/README.md b/rds-refresh/README.md new file mode 100644 index 00000000..5f4e31d1 --- /dev/null +++ b/rds-refresh/README.md @@ -0,0 +1,363 @@ +# RDS/Aurora Refresh for DBLab + +Automate DBLab Engine data refresh from RDS/Aurora snapshots without impacting production. + +## The Problem + +DBLab Engine in logical mode needs to connect to a source PostgreSQL database to perform `pg_dump`. Connecting directly to production RDS/Aurora during refresh: +- Creates load on production +- Requires opening network access from DBLab to production +- Can take hours, blocking production resources + +## The Solution + +This tool creates a **temporary clone** from an RDS/Aurora snapshot, points DBLab at the clone for refresh, then deletes the clone. Production is never touched. + +``` +┌─────────────┐ snapshot ┌─────────────┐ +│ Production │ ───────────────► │ Snapshot │ +│ RDS/Aurora │ (automated) │ │ +└─────────────┘ └──────┬──────┘ + │ + │ restore + ▼ +┌─────────────┐ pg_dump ┌─────────────┐ +│ DBLab │ ◄──────────────── │ Temp Clone │ +│ Engine │ │ (deleted) │ +└─────────────┘ └─────────────┘ +``` + +## Prerequisites + +- **DBLab Engine** running in logical mode with API access enabled +- **AWS credentials** with RDS permissions (see [IAM Permissions](#iam-permissions)) +- **Network access** from the temp clone to DBLab Engine (same VPC or peered) +- **RDS automated snapshots** enabled on your source database + +## Quick Start + +### 1. Get the tool + +**Option A: Docker (recommended)** +```bash +docker pull postgresai/rds-refresh:latest +``` + +**Option B: Build from source** +```bash +git clone https://github.com/postgres-ai/database-lab-engine.git +cd database-lab-engine/rds-refresh +make build +``` + +### 2. Create configuration + +```bash +cat > config.yaml << 'EOF' +source: + type: rds # or "aurora-cluster" + identifier: my-production-db # your RDS instance ID + dbName: postgres # database to dump + username: postgres + password: ${DB_PASSWORD} # from environment variable + +clone: + instanceClass: db.t3.medium # can be smaller than prod + subnetGroup: default # must allow DBLab access + securityGroups: + - sg-xxxxxxxxx # must allow inbound from DBLab + +dblab: + apiEndpoint: https://dblab.example.com:2345 + token: ${DBLAB_TOKEN} + timeout: 4h + +aws: + region: us-east-1 +EOF +``` + +### 3. Run + +**Dry run first** (validates config, finds snapshot, no changes made): +```bash +# Docker +docker run --rm \ + -v $(pwd)/config.yaml:/config.yaml \ + -e DB_PASSWORD -e DBLAB_TOKEN \ + -e AWS_ACCESS_KEY_ID -e AWS_SECRET_ACCESS_KEY \ + postgresai/rds-refresh -config /config.yaml -dry-run + +# Binary +export DB_PASSWORD="..." DBLAB_TOKEN="..." +./rds-refresh -config config.yaml -dry-run +``` + +**Full refresh:** +```bash +# Docker +docker run --rm \ + -v $(pwd)/config.yaml:/config.yaml \ + -e DB_PASSWORD -e DBLAB_TOKEN \ + -e AWS_ACCESS_KEY_ID -e AWS_SECRET_ACCESS_KEY \ + postgresai/rds-refresh -config /config.yaml + +# Binary +./rds-refresh -config config.yaml +``` + +## Configuration Reference + +| Field | Required | Description | +|-------|----------|-------------| +| `source.type` | Yes | `rds` or `aurora-cluster` | +| `source.identifier` | Yes | RDS instance or Aurora cluster identifier | +| `source.dbName` | Yes | Database name for DBLab to connect to | +| `source.username` | Yes | Database username | +| `source.password` | Yes | Database password (use `${ENV_VAR}` syntax) | +| `clone.instanceClass` | Yes | Instance class for temp clone (e.g., `db.t3.medium`) | +| `clone.subnetGroup` | No | DB subnet group (uses source's if not set) | +| `clone.securityGroups` | No | Security group IDs for the clone | +| `clone.publiclyAccessible` | No | Make clone publicly accessible (default: false) | +| `dblab.apiEndpoint` | Yes | DBLab Engine API URL | +| `dblab.token` | Yes | DBLab verification token | +| `dblab.timeout` | No | Max wait for refresh (default: 4h) | +| `dblab.pollInterval` | No | Status check interval (default: 30s) | +| `dblab.insecure` | No | Skip TLS verification (default: false) | +| `aws.region` | Yes | AWS region | + +See [config.example.yaml](config.example.yaml) for all options with comments. + +## Scheduling + +The refresh takes 1-4 hours. Schedule it during off-peak hours. + +### Docker with cron + +```bash +# /etc/cron.d/dblab-refresh +0 2 * * 0 root docker run --rm \ + -v /etc/dblab/config.yaml:/config.yaml \ + -e DB_PASSWORD -e DBLAB_TOKEN \ + -e AWS_ACCESS_KEY_ID -e AWS_SECRET_ACCESS_KEY \ + postgresai/rds-refresh -config /config.yaml \ + >> /var/log/dblab-refresh.log 2>&1 +``` + +### Kubernetes CronJob + +```yaml +apiVersion: batch/v1 +kind: CronJob +metadata: + name: dblab-rds-refresh +spec: + schedule: "0 2 * * 0" # Sundays at 2 AM + concurrencyPolicy: Forbid + jobTemplate: + spec: + backoffLimit: 1 + template: + spec: + serviceAccountName: dblab-rds-refresh # IRSA for AWS + containers: + - name: rds-refresh + image: postgresai/rds-refresh:latest + args: ["-config", "/config/config.yaml"] + env: + - name: DB_PASSWORD + valueFrom: + secretKeyRef: + name: dblab-secrets + key: db-password + - name: DBLAB_TOKEN + valueFrom: + secretKeyRef: + name: dblab-secrets + key: token + volumeMounts: + - name: config + mountPath: /config + volumes: + - name: config + configMap: + name: rds-refresh-config + restartPolicy: Never +``` + +### AWS ECS Scheduled Task + +```bash +# Create EventBridge rule +aws events put-rule \ + --name dblab-refresh-weekly \ + --schedule-expression "cron(0 2 ? * SUN *)" + +# Target ECS task (create task definition first) +aws events put-targets \ + --rule dblab-refresh-weekly \ + --targets '[{ + "Id": "1", + "Arn": "arn:aws:ecs:us-east-1:ACCOUNT:cluster/CLUSTER", + "RoleArn": "arn:aws:iam::ACCOUNT:role/ecsEventsRole", + "EcsParameters": { + "TaskDefinitionArn": "arn:aws:ecs:us-east-1:ACCOUNT:task-definition/dblab-rds-refresh", + "TaskCount": 1, + "LaunchType": "FARGATE", + "NetworkConfiguration": { + "awsvpcConfiguration": { + "Subnets": ["subnet-xxx"], + "SecurityGroups": ["sg-xxx"], + "AssignPublicIp": "DISABLED" + } + } + } + }]' +``` + +## IAM Permissions + +Minimal IAM policy for the tool: + +```json +{ + "Version": "2012-10-17", + "Statement": [ + { + "Effect": "Allow", + "Action": [ + "rds:DescribeDBSnapshots", + "rds:DescribeDBClusterSnapshots", + "rds:DescribeDBInstances", + "rds:DescribeDBClusters" + ], + "Resource": "*" + }, + { + "Effect": "Allow", + "Action": [ + "rds:RestoreDBInstanceFromDBSnapshot", + "rds:RestoreDBClusterFromSnapshot", + "rds:CreateDBInstance", + "rds:DeleteDBInstance", + "rds:DeleteDBCluster", + "rds:AddTagsToResource", + "rds:ModifyDBInstance", + "rds:ModifyDBCluster" + ], + "Resource": [ + "arn:aws:rds:*:ACCOUNT_ID:db:dblab-refresh-*", + "arn:aws:rds:*:ACCOUNT_ID:cluster:dblab-refresh-*", + "arn:aws:rds:*:ACCOUNT_ID:snapshot:*", + "arn:aws:rds:*:ACCOUNT_ID:cluster-snapshot:*", + "arn:aws:rds:*:ACCOUNT_ID:subgrp:*", + "arn:aws:rds:*:ACCOUNT_ID:pg:*", + "arn:aws:rds:*:ACCOUNT_ID:og:*" + ] + } + ] +} +``` + +## Network Requirements + +The temporary clone needs to be accessible from DBLab Engine: + +``` +┌─────────────────────────────────────────────────────┐ +│ VPC │ +│ ┌──────────────┐ ┌──────────────┐ │ +│ │ DBLab │ ──5432─► │ Temp Clone │ │ +│ │ Engine │ │ (RDS) │ │ +│ └──────────────┘ └──────────────┘ │ +│ │ +│ Security Group: Allow inbound 5432 from DBLab SG │ +└─────────────────────────────────────────────────────┘ +``` + +- Clone and DBLab should be in the same VPC (or peered VPCs) +- Security group must allow PostgreSQL port (5432) from DBLab +- If using `publiclyAccessible: true`, ensure DBLab can reach public endpoint + +## DBLab Engine Setup + +Your DBLab must be running in **logical mode**. The tool updates DBLab's source config via API before triggering refresh. + +**Important**: The tool communicates with DBLab purely via HTTP API. It can run from anywhere (different machine, container, cloud) - no SSH or filesystem access to the DBLab server is required. DBLab automatically reloads configuration when updated via API. + +Minimal DBLab config: + +```yaml +server: + port: 2345 + +retrieval: + refresh: + timetable: "" # Disable built-in scheduler + skipStartRefresh: true # Don't refresh on startup + + jobs: + - logicalDump + - logicalRestore + - logicalSnapshot + + spec: + logicalDump: + options: + source: + connection: + host: placeholder # Updated by rds-refresh + port: 5432 + dbname: postgres + username: postgres + password: placeholder +``` + +## Workflow Details + +1. **Health Check** - Verify DBLab is reachable and not mid-refresh +2. **Find Snapshot** - Get latest automated snapshot (or specified one) +3. **Create Clone** - Restore snapshot to new RDS instance (`dblab-refresh-YYYYMMDD-HHMMSS`) +4. **Wait for Clone** - Poll until instance is available (10-30 min) +5. **Update DBLab** - PUT `/admin/config` with clone's endpoint (auto-reloads, no restart needed) +6. **Trigger Refresh** - POST `/admin/full-refresh` +7. **Wait for Refresh** - Poll status until complete (1-4 hours) +8. **Delete Clone** - Remove temporary instance (always runs, even on failure) + +## Troubleshooting + +**"No snapshots found"** +- Ensure automated backups are enabled on your RDS instance +- Check the `source.identifier` matches exactly + +**"Clone not accessible"** +- Verify security group allows inbound 5432 +- Check subnet group has proper routing to DBLab +- Try `publiclyAccessible: true` for testing + +**"Config update failed"** +- Verify DBLab API endpoint and token +- Check DBLab is running in logical mode +- Ensure `/admin/config` endpoint is enabled + +**"Refresh timeout"** +- Increase `dblab.timeout` (default 4h) +- Check DBLab logs for pg_dump/pg_restore errors +- Large databases take longer - consider partial dumps + +**"AWS credentials error"** +- For ECS: attach IAM role to task definition +- For Kubernetes: use IRSA (IAM Roles for Service Accounts) +- For local: set `AWS_ACCESS_KEY_ID` and `AWS_SECRET_ACCESS_KEY` + +## Cost + +You pay for the clone instance only while it exists: +- **db.t3.medium**: ~$0.068/hour → ~$0.34 for 5-hour refresh +- **db.r5.large**: ~$0.24/hour → ~$1.20 for 5-hour refresh + +Storage is not duplicated (snapshot-based restore uses copy-on-write). + +## License + +Apache 2.0 - [Postgres.ai](https://postgres.ai) diff --git a/rds-refresh/config.example.yaml b/rds-refresh/config.example.yaml new file mode 100644 index 00000000..93b59f2a --- /dev/null +++ b/rds-refresh/config.example.yaml @@ -0,0 +1,94 @@ +# Example configuration for rds-refresh +# +# Copy this file to config.yaml and customize for your environment. + +# Source database configuration +source: + # Type of source database: + # - "rds" for RDS DB instance + # - "aurora-cluster" for Aurora cluster + type: rds + + # RDS DB instance identifier or Aurora cluster identifier + identifier: production-db + + # Database name to connect to (used when configuring DBLab) + dbName: myapp + + # Database credentials (used when configuring DBLab to connect to clone) + # Use environment variable expansion for security + username: postgres + password: ${DB_PASSWORD} + + # Optional: Specific snapshot identifier to use + # If empty, the latest automated snapshot will be used + # snapshotIdentifier: rds:production-db-2024-01-15-02-00 + +# Temporary clone configuration +clone: + # Instance class for the clone (can be smaller than production) + instanceClass: db.t3.medium + + # DB subnet group (must be in a VPC accessible from DBLab Engine) + subnetGroup: default-vpc-subnet + + # VPC security groups for the clone + # Must allow inbound connections from DBLab Engine on PostgreSQL port + securityGroups: + - sg-12345678 + + # Whether the clone should be publicly accessible + # Set to false if DBLab is in the same VPC + publiclyAccessible: false + + # Enable IAM database authentication (recommended) + enableIAMAuth: true + + # Optional: DB parameter group name + # parameterGroup: custom-postgres-params + + # Optional: DB option group name (RDS only) + # optionGroup: custom-options + + # Optional: Cluster parameter group (Aurora only) + # clusterParameterGroup: aurora-postgres-params + + # Optional: Custom port (default: 5432) + # port: 5432 + + # Optional: Storage type (gp2, gp3, io1, io2) + # storageType: gp3 + + # Deletion protection (should be false for temporary clones) + deletionProtection: false + + # Additional tags for the clone + tags: + Environment: dblab-refresh + Team: platform + +# DBLab Engine configuration +dblab: + # DBLab Engine API endpoint + apiEndpoint: https://dblab.example.com:2345 + + # Verification token for DBLab API + # Use environment variable expansion for security + token: ${DBLAB_TOKEN} + + # Skip TLS certificate verification (not recommended for production) + insecure: false + + # How often to poll DBLab status during refresh + pollInterval: 30s + + # Maximum time to wait for refresh to complete + timeout: 4h + +# AWS configuration +aws: + # AWS region where RDS/Aurora resources are located + region: us-east-1 + + # Optional: Custom AWS endpoint (for testing with LocalStack) + # endpoint: http://localhost:4566 diff --git a/rds-refresh/config.go b/rds-refresh/config.go new file mode 100644 index 00000000..99bc0209 --- /dev/null +++ b/rds-refresh/config.go @@ -0,0 +1,204 @@ +/* +2025 © PostgresAI +*/ + +package main + +import ( + "fmt" + "os" + "time" + + "gopkg.in/yaml.v3" +) + +// Config holds the configuration for the RDS refresh component. +type Config struct { + Source SourceConfig `yaml:"source"` + Clone CloneConfig `yaml:"clone"` + DBLab DBLabConfig `yaml:"dblab"` + AWS AWSConfig `yaml:"aws"` +} + +// SourceConfig defines the source RDS/Aurora database to clone from. +type SourceConfig struct { + // Type specifies the source type: "rds" for RDS instance, "aurora-cluster" for Aurora cluster. + Type string `yaml:"type"` + // Identifier is the RDS DB instance identifier or Aurora cluster identifier. + Identifier string `yaml:"identifier"` + // SnapshotIdentifier is the specific snapshot to use. If empty, the latest automated snapshot is used. + SnapshotIdentifier string `yaml:"snapshotIdentifier"` + // DBName is the database name to connect to (used when updating DBLab config). + DBName string `yaml:"dbName"` + // Username is the database username (used when updating DBLab config). + Username string `yaml:"username"` + // Password is the database password (used when updating DBLab config). + Password string `yaml:"password"` +} + +// CloneConfig defines settings for the temporary clone. +type CloneConfig struct { + // InstanceClass is the DB instance class for the clone (e.g., "db.t3.medium"). + InstanceClass string `yaml:"instanceClass"` + // DBSubnetGroupName is the DB subnet group for the clone. + DBSubnetGroupName string `yaml:"subnetGroup"` + // VPCSecurityGroupIDs are the security group IDs to assign to the clone. + VPCSecurityGroupIDs []string `yaml:"securityGroups"` + // PubliclyAccessible determines if the clone should be publicly accessible. + PubliclyAccessible bool `yaml:"publiclyAccessible"` + // Tags are additional tags to add to the clone. + Tags map[string]string `yaml:"tags"` + // ParameterGroupName is the parameter group to use for the clone. + ParameterGroupName string `yaml:"parameterGroup"` + // OptionGroupName is the option group to use for the clone (RDS only). + OptionGroupName string `yaml:"optionGroup"` + // DBClusterParameterGroupName is the cluster parameter group for Aurora clones. + DBClusterParameterGroupName string `yaml:"clusterParameterGroup"` + // Port is the port for the clone. If 0, uses default port. + Port int32 `yaml:"port"` + // EnableIAMAuth enables IAM database authentication. + EnableIAMAuth bool `yaml:"enableIAMAuth"` + // StorageType specifies storage type (gp2, gp3, io1, etc.) for RDS clones. + StorageType string `yaml:"storageType"` + // DeletionProtection enables deletion protection on the clone. + DeletionProtection bool `yaml:"deletionProtection"` +} + +// DBLabConfig defines the DBLab Engine connection settings. +type DBLabConfig struct { + // APIEndpoint is the DBLab Engine API endpoint (e.g., "https://dblab.example.com:2345"). + APIEndpoint string `yaml:"apiEndpoint"` + // Token is the verification token for the DBLab API. + Token string `yaml:"token"` + // Insecure allows connections to DBLab with invalid TLS certificates. + Insecure bool `yaml:"insecure"` + // PollInterval is how often to poll the DBLab status during refresh. + PollInterval Duration `yaml:"pollInterval"` + // Timeout is the maximum time to wait for the refresh to complete. + Timeout Duration `yaml:"timeout"` +} + +// AWSConfig holds AWS-specific settings. +type AWSConfig struct { + // Region is the AWS region where the RDS/Aurora resources are located. + Region string `yaml:"region"` + // Endpoint is a custom AWS endpoint (useful for testing with LocalStack). + Endpoint string `yaml:"endpoint"` +} + +// Duration is a wrapper around time.Duration for YAML parsing. +type Duration time.Duration + +// UnmarshalYAML implements yaml.Unmarshaler for Duration. +func (d *Duration) UnmarshalYAML(value *yaml.Node) error { + var s string + if err := value.Decode(&s); err != nil { + return err + } + + dur, err := time.ParseDuration(s) + if err != nil { + return fmt.Errorf("invalid duration %q: %w", s, err) + } + + *d = Duration(dur) + + return nil +} + +// MarshalYAML implements yaml.Marshaler for Duration. +func (d Duration) MarshalYAML() (interface{}, error) { + return time.Duration(d).String(), nil +} + +// Duration returns the time.Duration value. +func (d Duration) Duration() time.Duration { + return time.Duration(d) +} + +// LoadConfig loads configuration from a YAML file. +func LoadConfig(path string) (*Config, error) { + data, err := os.ReadFile(path) + if err != nil { + return nil, fmt.Errorf("failed to read config file: %w", err) + } + + // Expand environment variables in the config + data = []byte(os.ExpandEnv(string(data))) + + var cfg Config + if err := yaml.Unmarshal(data, &cfg); err != nil { + return nil, fmt.Errorf("failed to parse config file: %w", err) + } + + if err := cfg.Validate(); err != nil { + return nil, fmt.Errorf("invalid configuration: %w", err) + } + + cfg.SetDefaults() + + return &cfg, nil +} + +// Validate checks that the configuration is valid. +func (c *Config) Validate() error { + if c.Source.Type == "" { + return fmt.Errorf("source.type is required (rds or aurora-cluster)") + } + + if c.Source.Type != "rds" && c.Source.Type != "aurora-cluster" { + return fmt.Errorf("source.type must be 'rds' or 'aurora-cluster', got %q", c.Source.Type) + } + + if c.Source.Identifier == "" { + return fmt.Errorf("source.identifier is required") + } + + if c.Source.DBName == "" { + return fmt.Errorf("source.dbName is required") + } + + if c.Source.Username == "" { + return fmt.Errorf("source.username is required") + } + + if c.Source.Password == "" { + return fmt.Errorf("source.password is required") + } + + if c.Clone.InstanceClass == "" { + return fmt.Errorf("clone.instanceClass is required") + } + + if c.DBLab.APIEndpoint == "" { + return fmt.Errorf("dblab.apiEndpoint is required") + } + + if c.DBLab.Token == "" { + return fmt.Errorf("dblab.token is required") + } + + if c.AWS.Region == "" { + return fmt.Errorf("aws.region is required") + } + + return nil +} + +// SetDefaults sets default values for optional configuration fields. +func (c *Config) SetDefaults() { + if c.DBLab.PollInterval == 0 { + c.DBLab.PollInterval = Duration(30 * time.Second) + } + + if c.DBLab.Timeout == 0 { + c.DBLab.Timeout = Duration(4 * time.Hour) + } + + if c.Clone.Tags == nil { + c.Clone.Tags = make(map[string]string) + } + + c.Clone.Tags["ManagedBy"] = "dblab-rds-refresh" + c.Clone.Tags["AutoDelete"] = "true" +} diff --git a/rds-refresh/dblab.go b/rds-refresh/dblab.go new file mode 100644 index 00000000..a0a0eee0 --- /dev/null +++ b/rds-refresh/dblab.go @@ -0,0 +1,309 @@ +/* +2025 © PostgresAI +*/ + +package main + +import ( + "bytes" + "context" + "crypto/tls" + "encoding/json" + "fmt" + "io" + "net/http" + "time" +) + +const ( + verificationHeader = "Verification-Token" + contentTypeJSON = "application/json" +) + +// RetrievalStatus defines status of refreshing data. +type RetrievalStatus string + +const ( + StatusInactive RetrievalStatus = "inactive" + StatusPending RetrievalStatus = "pending" + StatusFailed RetrievalStatus = "failed" + StatusRefreshing RetrievalStatus = "refreshing" + StatusRenewed RetrievalStatus = "renewed" + StatusSnapshotting RetrievalStatus = "snapshotting" + StatusFinished RetrievalStatus = "finished" +) + +// InstanceStatus represents the DBLab Engine status response. +type InstanceStatus struct { + Status *Status `json:"status"` + Retrieving Retrieving `json:"retrieving"` +} + +// Status represents a generic status. +type Status struct { + Code string `json:"code"` + Message string `json:"message"` +} + +// Retrieving represents state of retrieval subsystem. +type Retrieving struct { + Mode string `json:"mode"` + Status RetrievalStatus `json:"status"` + LastRefresh string `json:"lastRefresh"` + NextRefresh string `json:"nextRefresh"` + Alerts map[string]Alert `json:"alerts"` +} + +// Alert describes an alert. +type Alert struct { + Level string `json:"level"` + Message string `json:"message"` +} + +// APIResponse represents a generic API response. +type APIResponse struct { + Status string `json:"status"` + Message string `json:"message"` +} + +// APIError represents an API error response. +type APIError struct { + Code string `json:"code"` + Message string `json:"message"` +} + +// ConfigUpdateRequest represents a request to update DBLab config. +// Uses flat structure matching DBLab's ConfigProjection fields. +type ConfigUpdateRequest struct { + Host *string `json:"host,omitempty"` + Port *int64 `json:"port,omitempty"` + DBName *string `json:"dbname,omitempty"` + Username *string `json:"username,omitempty"` + Password *string `json:"password,omitempty"` +} + +// DBLabClient provides methods to interact with the DBLab Engine API. +type DBLabClient struct { + baseURL string + token string + httpClient *http.Client +} + +// NewDBLabClient creates a new DBLab API client. +func NewDBLabClient(cfg *DBLabConfig, logger Logger) *DBLabClient { + if cfg.Insecure && logger != nil { + logger.Error("WARNING: TLS certificate verification is disabled. This is insecure for production use.") + } + + transport := &http.Transport{ + TLSClientConfig: &tls.Config{InsecureSkipVerify: cfg.Insecure}, + } + + return &DBLabClient{ + baseURL: cfg.APIEndpoint, + token: cfg.Token, + httpClient: &http.Client{ + Transport: transport, + Timeout: 60 * time.Second, + }, + } +} + +// GetStatus returns the current DBLab Engine instance status. +func (c *DBLabClient) GetStatus(ctx context.Context) (*InstanceStatus, error) { + resp, err := c.doRequest(ctx, http.MethodGet, "/status", nil) + if err != nil { + return nil, err + } + defer resp.Body.Close() + + var status InstanceStatus + if err := json.NewDecoder(resp.Body).Decode(&status); err != nil { + return nil, fmt.Errorf("failed to decode status response: %w", err) + } + + return &status, nil +} + +// TriggerFullRefresh triggers a full data refresh on the DBLab Engine. +func (c *DBLabClient) TriggerFullRefresh(ctx context.Context) error { + resp, err := c.doRequest(ctx, http.MethodPost, "/full-refresh", nil) + if err != nil { + return err + } + defer resp.Body.Close() + + var result APIResponse + if err := json.NewDecoder(resp.Body).Decode(&result); err != nil { + return fmt.Errorf("failed to decode response: %w", err) + } + + if result.Status != "OK" { + return fmt.Errorf("full refresh failed: %s", result.Message) + } + + return nil +} + +// WaitForRefreshComplete polls the DBLab status until refresh is complete or timeout. +// It first waits for the refresh to start (status changes from finished/inactive), +// then waits for it to complete. This prevents race conditions where stale status +// from a previous refresh could cause premature return. +func (c *DBLabClient) WaitForRefreshComplete(ctx context.Context, pollInterval, timeout time.Duration) error { + timeoutTimer := time.NewTimer(timeout) + defer timeoutTimer.Stop() + + refreshStarted := false + + // checkStatus handles status evaluation and returns (done, error) + checkStatus := func() (bool, error) { + status, err := c.GetStatus(ctx) + if err != nil { + return false, fmt.Errorf("failed to get status: %w", err) + } + + switch status.Retrieving.Status { + case StatusRefreshing, StatusSnapshotting, StatusRenewed, StatusPending: + refreshStarted = true + return false, nil + case StatusFinished: + if !refreshStarted { + return false, nil + } + return true, nil + case StatusFailed: + if !refreshStarted { + return false, nil + } + if len(status.Retrieving.Alerts) > 0 { + for _, alert := range status.Retrieving.Alerts { + return false, fmt.Errorf("refresh failed: %s", alert.Message) + } + } + return false, fmt.Errorf("refresh failed (no details available)") + case StatusInactive: + if refreshStarted { + return false, fmt.Errorf("refresh stopped unexpectedly (status: inactive)") + } + return false, nil + default: + return false, nil + } + } + + // immediate first check + if done, err := checkStatus(); err != nil { + return err + } else if done { + return nil + } + + ticker := time.NewTicker(pollInterval) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + return ctx.Err() + case <-timeoutTimer.C: + if !refreshStarted { + return fmt.Errorf("timeout waiting for refresh to start after %v", timeout) + } + return fmt.Errorf("timeout waiting for refresh to complete after %v", timeout) + case <-ticker.C: + if done, err := checkStatus(); err != nil { + return err + } else if done { + return nil + } + } + } +} + +// IsRefreshInProgress checks if a refresh is currently in progress. +// Considers all active states: refreshing, snapshotting, pending, renewed. +func (c *DBLabClient) IsRefreshInProgress(ctx context.Context) (bool, error) { + status, err := c.GetStatus(ctx) + if err != nil { + return false, err + } + + switch status.Retrieving.Status { + case StatusRefreshing, StatusSnapshotting, StatusPending, StatusRenewed: + return true, nil + default: + return false, nil + } +} + +// Health checks if the DBLab Engine is healthy. +func (c *DBLabClient) Health(ctx context.Context) error { + resp, err := c.doRequest(ctx, http.MethodGet, "/healthz", nil) + if err != nil { + return err + } + defer resp.Body.Close() + + return nil +} + +// UpdateSourceConfig updates the source database connection in DBLab config. +// DBLab automatically reloads the configuration after the update. +func (c *DBLabClient) UpdateSourceConfig(ctx context.Context, host string, port int, dbname, username, password string) error { + port64 := int64(port) + updateReq := ConfigUpdateRequest{ + Host: &host, + Port: &port64, + DBName: &dbname, + Username: &username, + Password: &password, + } + + bodyBytes, err := json.Marshal(updateReq) + if err != nil { + return fmt.Errorf("failed to marshal config update: %w", err) + } + + resp, err := c.doRequest(ctx, http.MethodPut, "/admin/config", bytes.NewReader(bodyBytes)) + if err != nil { + return fmt.Errorf("failed to update DBLab config: %w", err) + } + defer resp.Body.Close() + + return nil +} + +func (c *DBLabClient) doRequest(ctx context.Context, method, path string, body io.Reader) (*http.Response, error) { + url := c.baseURL + path + + req, err := http.NewRequestWithContext(ctx, method, url, body) + if err != nil { + return nil, fmt.Errorf("failed to create request: %w", err) + } + + req.Header.Set(verificationHeader, c.token) + + if body != nil { + req.Header.Set("Content-Type", contentTypeJSON) + } + + resp, err := c.httpClient.Do(req) + if err != nil { + return nil, fmt.Errorf("request failed: %w", err) + } + + if resp.StatusCode >= http.StatusBadRequest { + defer resp.Body.Close() + + bodyBytes, _ := io.ReadAll(resp.Body) + + var errModel APIError + if err := json.Unmarshal(bodyBytes, &errModel); err == nil && errModel.Message != "" { + return nil, fmt.Errorf("API error (status %d): %s", resp.StatusCode, errModel.Message) + } + + return nil, fmt.Errorf("API error (status %d): %s", resp.StatusCode, string(bodyBytes)) + } + + return resp, nil +} diff --git a/rds-refresh/go.mod b/rds-refresh/go.mod new file mode 100644 index 00000000..a11819b1 --- /dev/null +++ b/rds-refresh/go.mod @@ -0,0 +1,27 @@ +module github.com/postgres-ai/rds-refresh + +go 1.23 + +toolchain go1.24.7 + +require ( + github.com/aws/aws-sdk-go-v2 v1.41.0 + github.com/aws/aws-sdk-go-v2/config v1.32.5 + github.com/aws/aws-sdk-go-v2/service/rds v1.113.1 + gopkg.in/yaml.v3 v3.0.1 +) + +require ( + github.com/aws/aws-sdk-go-v2/credentials v1.19.5 // indirect + github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.18.16 // indirect + github.com/aws/aws-sdk-go-v2/internal/configsources v1.4.16 // indirect + github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.7.16 // indirect + github.com/aws/aws-sdk-go-v2/internal/ini v1.8.4 // indirect + github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.13.4 // indirect + github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.13.16 // indirect + github.com/aws/aws-sdk-go-v2/service/signin v1.0.4 // indirect + github.com/aws/aws-sdk-go-v2/service/sso v1.30.7 // indirect + github.com/aws/aws-sdk-go-v2/service/ssooidc v1.35.12 // indirect + github.com/aws/aws-sdk-go-v2/service/sts v1.41.5 // indirect + github.com/aws/smithy-go v1.24.0 // indirect +) diff --git a/rds-refresh/go.sum b/rds-refresh/go.sum new file mode 100644 index 00000000..2907c9d4 --- /dev/null +++ b/rds-refresh/go.sum @@ -0,0 +1,42 @@ +github.com/aws/aws-lambda-go v1.51.0 h1:/THH60NjiAs3K5TWet3Gx5w8MdR7oPOQH9utaKYY1JQ= +github.com/aws/aws-lambda-go v1.51.0/go.mod h1:dpMpZgvWx5vuQJfBt0zqBha60q7Dd7RfgJv23DymV8A= +github.com/aws/aws-sdk-go-v2 v1.41.0 h1:tNvqh1s+v0vFYdA1xq0aOJH+Y5cRyZ5upu6roPgPKd4= +github.com/aws/aws-sdk-go-v2 v1.41.0/go.mod h1:MayyLB8y+buD9hZqkCW3kX1AKq07Y5pXxtgB+rRFhz0= +github.com/aws/aws-sdk-go-v2/config v1.32.5 h1:pz3duhAfUgnxbtVhIK39PGF/AHYyrzGEyRD9Og0QrE8= +github.com/aws/aws-sdk-go-v2/config v1.32.5/go.mod h1:xmDjzSUs/d0BB7ClzYPAZMmgQdrodNjPPhd6bGASwoE= +github.com/aws/aws-sdk-go-v2/credentials v1.19.5 h1:xMo63RlqP3ZZydpJDMBsH9uJ10hgHYfQFIk1cHDXrR4= +github.com/aws/aws-sdk-go-v2/credentials v1.19.5/go.mod h1:hhbH6oRcou+LpXfA/0vPElh/e0M3aFeOblE1sssAAEk= +github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.18.16 h1:80+uETIWS1BqjnN9uJ0dBUaETh+P1XwFy5vwHwK5r9k= +github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.18.16/go.mod h1:wOOsYuxYuB/7FlnVtzeBYRcjSRtQpAW0hCP7tIULMwo= +github.com/aws/aws-sdk-go-v2/internal/configsources v1.4.16 h1:rgGwPzb82iBYSvHMHXc8h9mRoOUBZIGFgKb9qniaZZc= +github.com/aws/aws-sdk-go-v2/internal/configsources v1.4.16/go.mod h1:L/UxsGeKpGoIj6DxfhOWHWQ/kGKcd4I1VncE4++IyKA= +github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.7.16 h1:1jtGzuV7c82xnqOVfx2F0xmJcOw5374L7N6juGW6x6U= +github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.7.16/go.mod h1:M2E5OQf+XLe+SZGmmpaI2yy+J326aFf6/+54PoxSANc= +github.com/aws/aws-sdk-go-v2/internal/ini v1.8.4 h1:WKuaxf++XKWlHWu9ECbMlha8WOEGm0OUEZqm4K/Gcfk= +github.com/aws/aws-sdk-go-v2/internal/ini v1.8.4/go.mod h1:ZWy7j6v1vWGmPReu0iSGvRiise4YI5SkR3OHKTZ6Wuc= +github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.13.4 h1:0ryTNEdJbzUCEWkVXEXoqlXV72J5keC1GvILMOuD00E= +github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.13.4/go.mod h1:HQ4qwNZh32C3CBeO6iJLQlgtMzqeG17ziAA/3KDJFow= +github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.13.16 h1:oHjJHeUy0ImIV0bsrX0X91GkV5nJAyv1l1CC9lnO0TI= +github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.13.16/go.mod h1:iRSNGgOYmiYwSCXxXaKb9HfOEj40+oTKn8pTxMlYkRM= +github.com/aws/aws-sdk-go-v2/service/rds v1.113.1 h1:/vV0g/Su8rCTqT57UUYiFU/aRrPXz//fGDn1dkXblG4= +github.com/aws/aws-sdk-go-v2/service/rds v1.113.1/go.mod h1:q02df+DL73LN+jDXzj86tMsI6kKf1kfv61nB684H+o8= +github.com/aws/aws-sdk-go-v2/service/signin v1.0.4 h1:HpI7aMmJ+mm1wkSHIA2t5EaFFv5EFYXePW30p1EIrbQ= +github.com/aws/aws-sdk-go-v2/service/signin v1.0.4/go.mod h1:C5RdGMYGlfM0gYq/tifqgn4EbyX99V15P2V3R+VHbQU= +github.com/aws/aws-sdk-go-v2/service/sso v1.30.7 h1:eYnlt6QxnFINKzwxP5/Ucs1vkG7VT3Iezmvfgc2waUw= +github.com/aws/aws-sdk-go-v2/service/sso v1.30.7/go.mod h1:+fWt2UHSb4kS7Pu8y+BMBvJF0EWx+4H0hzNwtDNRTrg= +github.com/aws/aws-sdk-go-v2/service/ssooidc v1.35.12 h1:AHDr0DaHIAo8c9t1emrzAlVDFp+iMMKnPdYy6XO4MCE= +github.com/aws/aws-sdk-go-v2/service/ssooidc v1.35.12/go.mod h1:GQ73XawFFiWxyWXMHWfhiomvP3tXtdNar/fi8z18sx0= +github.com/aws/aws-sdk-go-v2/service/sts v1.41.5 h1:SciGFVNZ4mHdm7gpD1dgZYnCuVdX1s+lFTg4+4DOy70= +github.com/aws/aws-sdk-go-v2/service/sts v1.41.5/go.mod h1:iW40X4QBmUxdP+fZNOpfmkdMZqsovezbAeO+Ubiv2pk= +github.com/aws/smithy-go v1.24.0 h1:LpilSUItNPFr1eY85RYgTIg5eIEPtvFbskaFcmmIUnk= +github.com/aws/smithy-go v1.24.0/go.mod h1:LEj2LM3rBRQJxPZTB4KuzZkaZYnZPnvgIhb4pu07mx0= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/stretchr/testify v1.7.2 h1:4jaiDzPyXQvSd7D0EjG45355tLlV3VOECpq10pLC+8s= +github.com/stretchr/testify v1.7.2/go.mod h1:R6va5+xMeoiuVRoj+gSkQ7d3FALtqAAGI1FQKckRals= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/rds-refresh/main.go b/rds-refresh/main.go new file mode 100644 index 00000000..995df956 --- /dev/null +++ b/rds-refresh/main.go @@ -0,0 +1,174 @@ +/* +2025 © PostgresAI + +rds-refresh - Automate DBLab full refresh using RDS/Aurora snapshots + +This tool creates a temporary RDS/Aurora clone from a snapshot, updates +DBLab Engine config with the clone endpoint, triggers a full refresh, +and then cleans up the temporary clone. +*/ +package main + +import ( + "context" + "flag" + "fmt" + "os" + "os/signal" + "syscall" +) + +var ( + version = "dev" + buildTime = "unknown" +) + +func main() { + configPath := flag.String("config", "", "Path to configuration file") + dryRun := flag.Bool("dry-run", false, "Validate configuration without creating resources") + showVersion := flag.Bool("version", false, "Show version information") + help := flag.Bool("help", false, "Show help") + + flag.Usage = printUsage + flag.Parse() + + if *help { + printUsage() + os.Exit(0) + } + + if *showVersion { + fmt.Printf("rds-refresh version %s (built: %s)\n", version, buildTime) + os.Exit(0) + } + + if *configPath == "" { + fmt.Fprintln(os.Stderr, "error: -config flag is required") + printUsage() + os.Exit(1) + } + + if err := run(*configPath, *dryRun); err != nil { + fmt.Fprintf(os.Stderr, "error: %v\n", err) + os.Exit(1) + } +} + +func run(configPath string, dryRun bool) error { + cfg, err := LoadConfig(configPath) + if err != nil { + return fmt.Errorf("failed to load config: %w", err) + } + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + // handle interrupt signals for graceful shutdown + sigCh := make(chan os.Signal, 1) + signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM) + + go func() { + sig := <-sigCh + fmt.Printf("\nReceived signal %v, initiating graceful shutdown...\n", sig) + cancel() + }() + + logger := &DefaultLogger{} + + refresher, err := NewRefresher(ctx, cfg, logger) + if err != nil { + return fmt.Errorf("failed to initialize refresher: %w", err) + } + + if dryRun { + return refresher.DryRun(ctx) + } + + result := refresher.Run(ctx) + + fmt.Println() + fmt.Println("=== Refresh Summary ===") + fmt.Printf("Success: %v\n", result.Success) + fmt.Printf("Snapshot: %s\n", result.SnapshotID) + fmt.Printf("Clone ID: %s\n", result.CloneID) + fmt.Printf("Duration: %v\n", result.Duration.Round(1e9)) + + if result.Error != nil { + return result.Error + } + + return nil +} + +func printUsage() { + fmt.Fprintf(os.Stderr, `rds-refresh - Automate DBLab full refresh using RDS/Aurora snapshots + +This tool creates a temporary RDS/Aurora clone from a snapshot, updates +DBLab Engine config with the clone endpoint, triggers a full refresh, +and then cleans up the temporary clone. + +USAGE: + rds-refresh -config [options] + +OPTIONS: + -config Path to YAML configuration file (required) + -dry-run Validate configuration without creating resources + -version Show version information + -help Show this help message + +DEPLOYMENT: + This tool is designed to run as a container (Docker, ECS Task, Kubernetes Job) + or directly from the command line. The refresh process can take 1-4 hours + depending on database size, so long-running execution environments are required. + + Docker: + docker run -v /path/to/config.yaml:/config.yaml \ + postgres-ai/rds-refresh -config /config.yaml + + ECS Task / Kubernetes Job: + Schedule as a periodic task (e.g., daily) using your orchestration platform. + + Cron: + 0 2 * * * /usr/local/bin/rds-refresh -config /etc/rds-refresh/config.yaml + +EXAMPLE CONFIGURATION: + + source: + type: rds # or "aurora-cluster" + identifier: production-db + dbName: myapp + username: postgres + password: ${DB_PASSWORD} # supports environment variable expansion + + clone: + instanceClass: db.t3.medium + subnetGroup: default-vpc-subnet + securityGroups: + - sg-12345678 + publiclyAccessible: false + + dblab: + apiEndpoint: https://dblab.example.com:2345 + token: ${DBLAB_TOKEN} + pollInterval: 30s + timeout: 4h + + aws: + region: us-east-1 + +WORKFLOW: + 1. Verifies DBLab is healthy and not already refreshing + 2. Gets source database info from RDS/Aurora + 3. Finds the latest automated snapshot + 4. Creates a temporary RDS clone from the snapshot + 5. Waits for the clone to be available (10-30 minutes) + 6. Updates DBLab config with the clone endpoint + 7. Triggers DBLab full refresh + 8. Waits for refresh to complete (1-4 hours) + 9. Deletes the temporary clone + +For more information, see: + https://postgres.ai/docs/database-lab-engine + +`) +} diff --git a/rds-refresh/rds.go b/rds-refresh/rds.go new file mode 100644 index 00000000..6f6e0faf --- /dev/null +++ b/rds-refresh/rds.go @@ -0,0 +1,505 @@ +/* +2025 © PostgresAI +*/ + +package main + +import ( + "context" + "fmt" + "sort" + "time" + + "github.com/aws/aws-sdk-go-v2/aws" + "github.com/aws/aws-sdk-go-v2/config" + "github.com/aws/aws-sdk-go-v2/service/rds" + "github.com/aws/aws-sdk-go-v2/service/rds/types" +) + +const ( + cloneNamePrefix = "dblab-refresh-" + waitPollInterval = 30 * time.Second + maxWaitTime = 2 * time.Hour + defaultPort int32 = 5432 +) + +// RDSClient wraps the AWS RDS client with convenience methods. +type RDSClient struct { + client *rds.Client + cfg *Config +} + +// CloneInfo holds information about a created clone. +type CloneInfo struct { + Identifier string + Endpoint string + Port int32 + IsCluster bool +} + +// NewRDSClient creates a new RDS client. +func NewRDSClient(ctx context.Context, cfg *Config) (*RDSClient, error) { + awsCfg, err := config.LoadDefaultConfig(ctx, config.WithRegion(cfg.AWS.Region)) + if err != nil { + return nil, fmt.Errorf("failed to load AWS config: %w", err) + } + + var opts []func(*rds.Options) + if cfg.AWS.Endpoint != "" { + opts = append(opts, func(o *rds.Options) { + o.BaseEndpoint = aws.String(cfg.AWS.Endpoint) + }) + } + + return &RDSClient{ + client: rds.NewFromConfig(awsCfg, opts...), + cfg: cfg, + }, nil +} + +// FindLatestSnapshot finds the latest available snapshot for the source. +func (r *RDSClient) FindLatestSnapshot(ctx context.Context) (string, error) { + if r.cfg.Source.SnapshotIdentifier != "" { + return r.cfg.Source.SnapshotIdentifier, nil + } + + if r.cfg.Source.Type == "aurora-cluster" { + return r.findLatestClusterSnapshot(ctx) + } + + return r.findLatestDBSnapshot(ctx) +} + +func (r *RDSClient) findLatestDBSnapshot(ctx context.Context) (string, error) { + input := &rds.DescribeDBSnapshotsInput{ + DBInstanceIdentifier: aws.String(r.cfg.Source.Identifier), + SnapshotType: aws.String("automated"), + } + + result, err := r.client.DescribeDBSnapshots(ctx, input) + if err != nil { + return "", fmt.Errorf("failed to describe DB snapshots: %w", err) + } + + if len(result.DBSnapshots) == 0 { + return "", fmt.Errorf("no automated snapshots found for RDS instance %q", r.cfg.Source.Identifier) + } + + // Sort by creation time (newest first) + sort.Slice(result.DBSnapshots, func(i, j int) bool { + ti := result.DBSnapshots[i].SnapshotCreateTime + tj := result.DBSnapshots[j].SnapshotCreateTime + + if ti == nil || tj == nil { + return ti != nil + } + + return ti.After(*tj) + }) + + // Find the first available snapshot + for _, snap := range result.DBSnapshots { + if snap.Status != nil && *snap.Status == "available" { + return *snap.DBSnapshotIdentifier, nil + } + } + + return "", fmt.Errorf("no available snapshots found for RDS instance %q", r.cfg.Source.Identifier) +} + +func (r *RDSClient) findLatestClusterSnapshot(ctx context.Context) (string, error) { + input := &rds.DescribeDBClusterSnapshotsInput{ + DBClusterIdentifier: aws.String(r.cfg.Source.Identifier), + SnapshotType: aws.String("automated"), + } + + result, err := r.client.DescribeDBClusterSnapshots(ctx, input) + if err != nil { + return "", fmt.Errorf("failed to describe DB cluster snapshots: %w", err) + } + + if len(result.DBClusterSnapshots) == 0 { + return "", fmt.Errorf("no automated snapshots found for Aurora cluster %q", r.cfg.Source.Identifier) + } + + // Sort by creation time (newest first) + sort.Slice(result.DBClusterSnapshots, func(i, j int) bool { + ti := result.DBClusterSnapshots[i].SnapshotCreateTime + tj := result.DBClusterSnapshots[j].SnapshotCreateTime + + if ti == nil || tj == nil { + return ti != nil + } + + return ti.After(*tj) + }) + + // Find the first available snapshot + for _, snap := range result.DBClusterSnapshots { + if snap.Status != nil && *snap.Status == "available" { + return *snap.DBClusterSnapshotIdentifier, nil + } + } + + return "", fmt.Errorf("no available snapshots found for Aurora cluster %q", r.cfg.Source.Identifier) +} + +// CreateClone creates a temporary clone from a snapshot. +func (r *RDSClient) CreateClone(ctx context.Context, snapshotID string) (*CloneInfo, error) { + cloneName := fmt.Sprintf("%s%s", cloneNamePrefix, time.Now().UTC().Format("20060102-150405")) + + if r.cfg.Source.Type == "aurora-cluster" { + return r.createAuroraClone(ctx, snapshotID, cloneName) + } + + return r.createRDSClone(ctx, snapshotID, cloneName) +} + +func (r *RDSClient) createRDSClone(ctx context.Context, snapshotID, cloneName string) (*CloneInfo, error) { + tags := r.buildTags() + + input := &rds.RestoreDBInstanceFromDBSnapshotInput{ + DBInstanceIdentifier: aws.String(cloneName), + DBSnapshotIdentifier: aws.String(snapshotID), + DBInstanceClass: aws.String(r.cfg.Clone.InstanceClass), + PubliclyAccessible: aws.Bool(r.cfg.Clone.PubliclyAccessible), + Tags: tags, + DeletionProtection: aws.Bool(r.cfg.Clone.DeletionProtection), + } + + if r.cfg.Clone.DBSubnetGroupName != "" { + input.DBSubnetGroupName = aws.String(r.cfg.Clone.DBSubnetGroupName) + } + + if len(r.cfg.Clone.VPCSecurityGroupIDs) > 0 { + input.VpcSecurityGroupIds = r.cfg.Clone.VPCSecurityGroupIDs + } + + if r.cfg.Clone.ParameterGroupName != "" { + input.DBParameterGroupName = aws.String(r.cfg.Clone.ParameterGroupName) + } + + if r.cfg.Clone.OptionGroupName != "" { + input.OptionGroupName = aws.String(r.cfg.Clone.OptionGroupName) + } + + if r.cfg.Clone.Port > 0 { + input.Port = aws.Int32(r.cfg.Clone.Port) + } + + if r.cfg.Clone.EnableIAMAuth { + input.EnableIAMDatabaseAuthentication = aws.Bool(true) + } + + if r.cfg.Clone.StorageType != "" { + input.StorageType = aws.String(r.cfg.Clone.StorageType) + } + + _, err := r.client.RestoreDBInstanceFromDBSnapshot(ctx, input) + if err != nil { + return nil, fmt.Errorf("failed to restore DB instance from snapshot: %w", err) + } + + return &CloneInfo{ + Identifier: cloneName, + IsCluster: false, + }, nil +} + +func (r *RDSClient) createAuroraClone(ctx context.Context, snapshotID, cloneName string) (*CloneInfo, error) { + tags := r.buildTags() + + // Get the engine from the snapshot first + snapshotResp, err := r.client.DescribeDBClusterSnapshots(ctx, &rds.DescribeDBClusterSnapshotsInput{ + DBClusterSnapshotIdentifier: aws.String(snapshotID), + }) + if err != nil { + return nil, fmt.Errorf("failed to describe cluster snapshot: %w", err) + } + + if len(snapshotResp.DBClusterSnapshots) == 0 { + return nil, fmt.Errorf("snapshot %q not found", snapshotID) + } + + snapshot := snapshotResp.DBClusterSnapshots[0] + + // Restore the Aurora cluster + clusterInput := &rds.RestoreDBClusterFromSnapshotInput{ + DBClusterIdentifier: aws.String(cloneName), + SnapshotIdentifier: aws.String(snapshotID), + Engine: snapshot.Engine, + Tags: tags, + DeletionProtection: aws.Bool(r.cfg.Clone.DeletionProtection), + } + + if r.cfg.Clone.DBSubnetGroupName != "" { + clusterInput.DBSubnetGroupName = aws.String(r.cfg.Clone.DBSubnetGroupName) + } + + if len(r.cfg.Clone.VPCSecurityGroupIDs) > 0 { + clusterInput.VpcSecurityGroupIds = r.cfg.Clone.VPCSecurityGroupIDs + } + + if r.cfg.Clone.DBClusterParameterGroupName != "" { + clusterInput.DBClusterParameterGroupName = aws.String(r.cfg.Clone.DBClusterParameterGroupName) + } + + if r.cfg.Clone.Port > 0 { + clusterInput.Port = aws.Int32(r.cfg.Clone.Port) + } + + if r.cfg.Clone.EnableIAMAuth { + clusterInput.EnableIAMDatabaseAuthentication = aws.Bool(true) + } + + _, err = r.client.RestoreDBClusterFromSnapshot(ctx, clusterInput) + if err != nil { + return nil, fmt.Errorf("failed to restore DB cluster from snapshot: %w", err) + } + + // Wait for cluster to be available before creating instance + if err := r.waitForClusterAvailable(ctx, cloneName); err != nil { + // Try to clean up the cluster + _ = r.deleteAuroraCluster(ctx, cloneName) + return nil, fmt.Errorf("cluster did not become available: %w", err) + } + + // Create a DB instance in the cluster + instanceName := cloneName + "-instance" + instanceInput := &rds.CreateDBInstanceInput{ + DBInstanceIdentifier: aws.String(instanceName), + DBInstanceClass: aws.String(r.cfg.Clone.InstanceClass), + DBClusterIdentifier: aws.String(cloneName), + Engine: snapshot.Engine, + Tags: tags, + } + + if r.cfg.Clone.ParameterGroupName != "" { + instanceInput.DBParameterGroupName = aws.String(r.cfg.Clone.ParameterGroupName) + } + + _, err = r.client.CreateDBInstance(ctx, instanceInput) + if err != nil { + // Try to clean up the cluster + _ = r.deleteAuroraCluster(ctx, cloneName) + return nil, fmt.Errorf("failed to create DB instance in cluster: %w", err) + } + + return &CloneInfo{ + Identifier: cloneName, + IsCluster: true, + }, nil +} + +func (r *RDSClient) buildTags() []types.Tag { + tags := make([]types.Tag, 0, len(r.cfg.Clone.Tags)) + + for k, v := range r.cfg.Clone.Tags { + tags = append(tags, types.Tag{ + Key: aws.String(k), + Value: aws.String(v), + }) + } + + return tags +} + +// WaitForCloneAvailable waits for the clone to become available and returns connection info. +func (r *RDSClient) WaitForCloneAvailable(ctx context.Context, clone *CloneInfo) error { + if clone.IsCluster { + instanceName := clone.Identifier + "-instance" + + if err := r.waitForInstanceAvailable(ctx, instanceName); err != nil { + return err + } + + // Get the cluster endpoint + clusterResp, err := r.client.DescribeDBClusters(ctx, &rds.DescribeDBClustersInput{ + DBClusterIdentifier: aws.String(clone.Identifier), + }) + if err != nil { + return fmt.Errorf("failed to describe cluster: %w", err) + } + + if len(clusterResp.DBClusters) == 0 { + return fmt.Errorf("cluster %q not found", clone.Identifier) + } + + cluster := clusterResp.DBClusters[0] + clone.Endpoint = aws.ToString(cluster.Endpoint) + clone.Port = aws.ToInt32(cluster.Port) + + if clone.Port == 0 { + clone.Port = defaultPort + } + + return nil + } + + if err := r.waitForInstanceAvailable(ctx, clone.Identifier); err != nil { + return err + } + + // Get the instance endpoint + instanceResp, err := r.client.DescribeDBInstances(ctx, &rds.DescribeDBInstancesInput{ + DBInstanceIdentifier: aws.String(clone.Identifier), + }) + if err != nil { + return fmt.Errorf("failed to describe instance: %w", err) + } + + if len(instanceResp.DBInstances) == 0 { + return fmt.Errorf("instance %q not found", clone.Identifier) + } + + instance := instanceResp.DBInstances[0] + + if instance.Endpoint != nil { + clone.Endpoint = aws.ToString(instance.Endpoint.Address) + clone.Port = aws.ToInt32(instance.Endpoint.Port) + } + + if clone.Port == 0 { + clone.Port = defaultPort + } + + return nil +} + +func (r *RDSClient) waitForInstanceAvailable(ctx context.Context, identifier string) error { + waiter := rds.NewDBInstanceAvailableWaiter(r.client) + + return waiter.Wait(ctx, &rds.DescribeDBInstancesInput{ + DBInstanceIdentifier: aws.String(identifier), + }, maxWaitTime) +} + +func (r *RDSClient) waitForClusterAvailable(ctx context.Context, identifier string) error { + waiter := rds.NewDBClusterAvailableWaiter(r.client) + + return waiter.Wait(ctx, &rds.DescribeDBClustersInput{ + DBClusterIdentifier: aws.String(identifier), + }, maxWaitTime) +} + +// DeleteClone deletes the temporary clone. +func (r *RDSClient) DeleteClone(ctx context.Context, clone *CloneInfo) error { + if clone.IsCluster { + return r.deleteAuroraCluster(ctx, clone.Identifier) + } + + return r.deleteRDSInstance(ctx, clone.Identifier) +} + +func (r *RDSClient) deleteRDSInstance(ctx context.Context, identifier string) error { + // First, disable deletion protection if enabled + _, _ = r.client.ModifyDBInstance(ctx, &rds.ModifyDBInstanceInput{ + DBInstanceIdentifier: aws.String(identifier), + DeletionProtection: aws.Bool(false), + ApplyImmediately: aws.Bool(true), + }) + + _, err := r.client.DeleteDBInstance(ctx, &rds.DeleteDBInstanceInput{ + DBInstanceIdentifier: aws.String(identifier), + SkipFinalSnapshot: aws.Bool(true), + DeleteAutomatedBackups: aws.Bool(true), + }) + + if err != nil { + return fmt.Errorf("failed to delete DB instance: %w", err) + } + + return nil +} + +func (r *RDSClient) deleteAuroraCluster(ctx context.Context, clusterIdentifier string) error { + // First, delete all instances in the cluster + instancesResp, err := r.client.DescribeDBInstances(ctx, &rds.DescribeDBInstancesInput{ + Filters: []types.Filter{ + { + Name: aws.String("db-cluster-id"), + Values: []string{clusterIdentifier}, + }, + }, + }) + if err != nil { + return fmt.Errorf("failed to list cluster instances: %w", err) + } + + for _, instance := range instancesResp.DBInstances { + if err := r.deleteRDSInstance(ctx, aws.ToString(instance.DBInstanceIdentifier)); err != nil { + return fmt.Errorf("failed to delete cluster instance: %w", err) + } + } + + // Wait for all instances to be deleted + for _, instance := range instancesResp.DBInstances { + waiter := rds.NewDBInstanceDeletedWaiter(r.client) + + if err := waiter.Wait(ctx, &rds.DescribeDBInstancesInput{ + DBInstanceIdentifier: instance.DBInstanceIdentifier, + }, maxWaitTime); err != nil { + return fmt.Errorf("failed waiting for instance deletion: %w", err) + } + } + + // Disable deletion protection on cluster + _, _ = r.client.ModifyDBCluster(ctx, &rds.ModifyDBClusterInput{ + DBClusterIdentifier: aws.String(clusterIdentifier), + DeletionProtection: aws.Bool(false), + ApplyImmediately: aws.Bool(true), + }) + + // Delete the cluster + _, err = r.client.DeleteDBCluster(ctx, &rds.DeleteDBClusterInput{ + DBClusterIdentifier: aws.String(clusterIdentifier), + SkipFinalSnapshot: aws.Bool(true), + }) + + if err != nil { + return fmt.Errorf("failed to delete DB cluster: %w", err) + } + + return nil +} + +// GetSourceInfo returns information about the source database. +func (r *RDSClient) GetSourceInfo(ctx context.Context) (string, error) { + if r.cfg.Source.Type == "aurora-cluster" { + resp, err := r.client.DescribeDBClusters(ctx, &rds.DescribeDBClustersInput{ + DBClusterIdentifier: aws.String(r.cfg.Source.Identifier), + }) + if err != nil { + return "", fmt.Errorf("failed to describe source cluster: %w", err) + } + + if len(resp.DBClusters) == 0 { + return "", fmt.Errorf("source cluster %q not found", r.cfg.Source.Identifier) + } + + cluster := resp.DBClusters[0] + + return fmt.Sprintf("Aurora cluster %s (engine: %s, version: %s)", + r.cfg.Source.Identifier, + aws.ToString(cluster.Engine), + aws.ToString(cluster.EngineVersion)), nil + } + + resp, err := r.client.DescribeDBInstances(ctx, &rds.DescribeDBInstancesInput{ + DBInstanceIdentifier: aws.String(r.cfg.Source.Identifier), + }) + if err != nil { + return "", fmt.Errorf("failed to describe source instance: %w", err) + } + + if len(resp.DBInstances) == 0 { + return "", fmt.Errorf("source instance %q not found", r.cfg.Source.Identifier) + } + + instance := resp.DBInstances[0] + + return fmt.Sprintf("RDS instance %s (engine: %s, version: %s)", + r.cfg.Source.Identifier, + aws.ToString(instance.Engine), + aws.ToString(instance.EngineVersion)), nil +} diff --git a/rds-refresh/refresher.go b/rds-refresh/refresher.go new file mode 100644 index 00000000..803a6812 --- /dev/null +++ b/rds-refresh/refresher.go @@ -0,0 +1,262 @@ +/* +2025 © PostgresAI +*/ + +package main + +import ( + "context" + "fmt" + "time" +) + +// Logger defines the logging interface. +type Logger interface { + Info(msg string, args ...interface{}) + Error(msg string, args ...interface{}) + Debug(msg string, args ...interface{}) +} + +// DefaultLogger is a simple stdout logger. +type DefaultLogger struct{} + +// Info logs an info message. +func (l *DefaultLogger) Info(msg string, args ...interface{}) { + fmt.Printf("[INFO] "+msg+"\n", args...) +} + +// Error logs an error message. +func (l *DefaultLogger) Error(msg string, args ...interface{}) { + fmt.Printf("[ERROR] "+msg+"\n", args...) +} + +// Debug logs a debug message. +func (l *DefaultLogger) Debug(msg string, args ...interface{}) { + fmt.Printf("[DEBUG] "+msg+"\n", args...) +} + +// Refresher orchestrates the RDS/Aurora clone and DBLab refresh workflow. +type Refresher struct { + cfg *Config + rds *RDSClient + dblab *DBLabClient + logger Logger +} + +// RefreshResult contains the result of a refresh operation. +type RefreshResult struct { + Success bool + SnapshotID string + CloneID string + StartTime time.Time + EndTime time.Time + Duration time.Duration + Error error + CloneEndpoint string +} + +// NewRefresher creates a new Refresher instance. +func NewRefresher(ctx context.Context, cfg *Config, logger Logger) (*Refresher, error) { + if logger == nil { + logger = &DefaultLogger{} + } + + rdsClient, err := NewRDSClient(ctx, cfg) + if err != nil { + return nil, fmt.Errorf("failed to create RDS client: %w", err) + } + + dblabClient := NewDBLabClient(&cfg.DBLab, logger) + + return &Refresher{ + cfg: cfg, + rds: rdsClient, + dblab: dblabClient, + logger: logger, + }, nil +} + +// Run executes the full refresh workflow: +// 1. Verifies DBLab is healthy and not already refreshing +// 2. Gets source database info +// 3. Finds the latest snapshot +// 4. Creates a temporary clone from the snapshot +// 5. Waits for the clone to be available +// 6. Updates DBLab config with the clone endpoint +// 7. Triggers DBLab full refresh +// 8. Waits for refresh to complete +// 9. Deletes the temporary clone +func (r *Refresher) Run(ctx context.Context) *RefreshResult { + result := &RefreshResult{ + StartTime: time.Now(), + } + + defer func() { + result.EndTime = time.Now() + result.Duration = result.EndTime.Sub(result.StartTime) + }() + + // Step 1: Check DBLab health and status + r.logger.Info("Checking DBLab Engine health...") + + if err := r.dblab.Health(ctx); err != nil { + result.Error = fmt.Errorf("DBLab health check failed: %w", err) + return result + } + + inProgress, err := r.dblab.IsRefreshInProgress(ctx) + if err != nil { + result.Error = fmt.Errorf("failed to check DBLab status: %w", err) + return result + } + + if inProgress { + result.Error = fmt.Errorf("refresh already in progress, skipping") + return result + } + + // Step 2: Get source info + r.logger.Info("Checking source database...") + + sourceInfo, err := r.rds.GetSourceInfo(ctx) + if err != nil { + result.Error = fmt.Errorf("failed to get source info: %w", err) + return result + } + + r.logger.Info("Source: %s", sourceInfo) + + // Step 3: Find latest snapshot + r.logger.Info("Finding latest snapshot...") + + snapshotID, err := r.rds.FindLatestSnapshot(ctx) + if err != nil { + result.Error = fmt.Errorf("failed to find snapshot: %w", err) + return result + } + + result.SnapshotID = snapshotID + r.logger.Info("Using snapshot: %s", snapshotID) + + // Step 4: Create temporary clone + r.logger.Info("Creating temporary RDS clone from snapshot...") + + clone, err := r.rds.CreateClone(ctx, snapshotID) + if err != nil { + result.Error = fmt.Errorf("failed to create clone: %w", err) + return result + } + + result.CloneID = clone.Identifier + r.logger.Info("Created clone: %s", clone.Identifier) + + // Ensure cleanup on any exit + defer func() { + r.logger.Info("Cleaning up temporary clone %s...", clone.Identifier) + + if deleteErr := r.rds.DeleteClone(context.Background(), clone); deleteErr != nil { + r.logger.Error("Failed to delete clone %s: %v (manual cleanup may be required)", clone.Identifier, deleteErr) + } else { + r.logger.Info("Successfully deleted temporary clone %s", clone.Identifier) + } + }() + + // Step 5: Wait for clone to be available + r.logger.Info("Waiting for clone to become available (this may take 10-30 minutes)...") + + if err := r.rds.WaitForCloneAvailable(ctx, clone); err != nil { + result.Error = fmt.Errorf("clone did not become available: %w", err) + return result + } + + result.CloneEndpoint = clone.Endpoint + r.logger.Info("Clone available at: %s:%d", clone.Endpoint, clone.Port) + + // Step 6: Update DBLab config with clone endpoint + r.logger.Info("Updating DBLab source config with clone endpoint...") + + if err := r.dblab.UpdateSourceConfig( + ctx, + clone.Endpoint, + int(clone.Port), + r.cfg.Source.DBName, + r.cfg.Source.Username, + r.cfg.Source.Password, + ); err != nil { + result.Error = fmt.Errorf("failed to update DBLab config: %w", err) + return result + } + + r.logger.Info("DBLab config updated successfully") + + // Step 7: Trigger DBLab full refresh + r.logger.Info("Triggering DBLab full refresh...") + + if err := r.dblab.TriggerFullRefresh(ctx); err != nil { + result.Error = fmt.Errorf("failed to trigger refresh: %w", err) + return result + } + + r.logger.Info("Full refresh triggered, waiting for completion...") + + // Step 7: Wait for refresh to complete + pollInterval := r.cfg.DBLab.PollInterval.Duration() + timeout := r.cfg.DBLab.Timeout.Duration() + + if err := r.dblab.WaitForRefreshComplete(ctx, pollInterval, timeout); err != nil { + result.Error = fmt.Errorf("refresh did not complete: %w", err) + return result + } + + r.logger.Info("DBLab refresh completed successfully!") + result.Success = true + + return result +} + +// DryRun performs all validation steps without actually creating resources. +func (r *Refresher) DryRun(ctx context.Context) error { + r.logger.Info("=== DRY RUN MODE ===") + + // Check DBLab + r.logger.Info("Checking DBLab Engine health...") + + if err := r.dblab.Health(ctx); err != nil { + return fmt.Errorf("DBLab health check failed: %w", err) + } + + r.logger.Info("DBLab Engine is healthy") + + // Check current status + status, err := r.dblab.GetStatus(ctx) + if err != nil { + return fmt.Errorf("failed to get DBLab status: %w", err) + } + + r.logger.Info("DBLab retrieval status: %s", status.Retrieving.Status) + + // Check source + r.logger.Info("Checking source database...") + + sourceInfo, err := r.rds.GetSourceInfo(ctx) + if err != nil { + return fmt.Errorf("failed to get source info: %w", err) + } + + r.logger.Info("Source: %s", sourceInfo) + + // Check snapshot + r.logger.Info("Finding latest snapshot...") + + snapshotID, err := r.rds.FindLatestSnapshot(ctx) + if err != nil { + return fmt.Errorf("failed to find snapshot: %w", err) + } + + r.logger.Info("Would use snapshot: %s", snapshotID) + r.logger.Info("Would create clone with instance class: %s", r.cfg.Clone.InstanceClass) + + r.logger.Info("=== DRY RUN COMPLETE - All checks passed ===") + + return nil +}