Filtering a CSV in R

I have a 2M line CSV (exported from Redshift), and needed to do some sanity checking. In SQL I would have written something like this:

select count(*)
from foo
where date >= '2020-10-1' and date < '2020-11-1'
and foo = 'bar'

So what’s the equivalent in R?

The recommendation seems to be to use data.table

install.packages('data.table')
library(data.table)
data <- fread("foo.csv")
str(data)

Filtering by value is easy:

foo <- data[foo == 'bar']

But a date range is a little trickier. R seems to know that the strings are a date format:

POSIXct, format: "2020-05-21 14:16:24" "2020-05-21 14:16:28" ...

I imagine it’s possible to truncate those values, but the easiest thing for me was to add a new col:

foo$date <- as.Date(totk$started_at)

and then use that with subset:

> nrow(subset(foo, date >= "2020-10-1" & date < "2020-11-1"))
[1] 73594

Using dependencies as a test oracle

I have long been a disciple of what is sometimes known as the “London school” of TDD (or “outside in” design), but I like to think I’m open to alternatives, when proven useful.

With that in mind, I found James Shore’s testing without mocks series very interesting. While I’m not quite ready to dive in at the deep end of that approach, one of the reasons to mock your dependencies (other than avoiding IO) is to remove the complexity from your tests, and James offers a handy alternative.

beforeEach(function() {
    dep1 = sinon.stub().resolves();
    ...
    handler = new Handler(dep1, dep2, dep3);
});

Rather than using [insert favourite mocking library] to represent those dependencies, and risking the slippage that can occur when the real version changes, but the tests are not updated (if you haven’t got contract tests for everything); you can use the real object (ideally, some pure “business” function) both in the set up, and also in your assertions, as a “test oracle“.

beforeEach(function() {
    dep1 = new Dep1();
    ...
});

it("should return the expected flurble", function() {
    ...
    const res = await handler.handle(req);

    expect(res.flurble).to.equal(dep1.bar(req.foo));
});

This way, if the implementation of the dependency changes, the test should still pass; unless it would actually affect the SUT.

I’m sure this approach comes with its own tradeoffs, and won’t help you with anything other than simple dependencies, but it can be useful in situations where you would like to use the real dependency and still keep the tests relatively simple.

(This is probably another force pushing in the direction of a ports and adapters architecture (or impure-pure sandwich), allowing you to use “sociable” tests in the kernel, and narrow integration tests at the edges.)

RDS Postgresql WalWriteLock

We recently had a service degradation/outage, which manifested as WalWriteLock in perf insights:

The direct cause was autovacuum on a large (heavily updated) table, but it had run ~1 hour earlier, without any issues.

Our short term solution was to raise the av threshold, and kill the process. But it had to run again, at some point, or we’d be in real trouble.

We checked the usual suspects for av slowdown, but couldn’t find any transaction older than the av process itself, or any abandoned replication slots.

We don’t currently have a replica (although we are using multi-AZ); but this prompted us to realise that we still had the wal_level set to logical, after using DMS to upgrade from pg 10 to 11. This generates considerably more WAL, than the next level down.

After turning that off (and failing over), we triggered AV again, but were still seeing high WalWriteLock contention. Eventually, we found this 10 year old breadcrumb on the pg-admin mailiing list:

Is it vacuuming a table which was bulk loaded at some time in the past? If so, this can happen any time later (usually during busy periods when many transactions numbers are being assigned)

https://www.postgresql.org/message-id/4DBFF5AE020000250003D1D9%40gw.wicourts.gov

So it seems like this was a little treat left for us by DMS, which combined with the extra WAL from logical, was enough to push us over the edge at a busy time.

Once that particular AV had managed to complete, the next one was back to normal.

Triggering a cron lambda

Once you have a lambda ready to run, you need an EventBridge rule to trigger it:

docker run --rm -it -v ~/.aws:/root/.aws -v $PWD:/data -w /data -e AWS_PROFILE amazon/aws-cli events put-rule --name foo --schedule-expression 'cron(0 4 * * ? *)'

You can either run it at a regular rate, or at a specific time.

And your lambda needs the right permissions:

aws-cli lambda add-permission --function-name foo --statement-id foo --action 'lambda:InvokeFunction' --principal events.amazonaws.com --source-arn arn:aws:events:region:account:rule/foo

Finally, you need a targets file:

[{
    "Id": "1",
    "Arn": "arn:aws:lambda:region:account:function:foo"
}]

to add to the rule:

aws-cli events put-targets --rule foo --targets file://targets.json

Cron lambda (Python)

For a simple task in Redshift, such as refreshing a materialized view, you can use a scheduled query; but sometimes you really want a proper scripting language, rather than SQL.

You can use a docker image as a lambda now, but I still find uploading a zip easier. And while it’s possible to set up the db creds as env vars, it’s better to use temp creds:

import boto3
import psycopg2

def handler(event, context):
    client = boto3.client('redshift')

    cluster_credentials = client.get_cluster_credentials(
        DbUser='user',
        DbName='db',
        ClusterIdentifier='cluster',
    )

    conn = psycopg2.connect(
        host="foo.bar.region.redshift.amazonaws.com",
        port="5439",
        dbname="db",
        user=cluster_credentials["DbUser"],
        password=cluster_credentials["DbPassword"],
    )

    with conn.cursor() as cursor:
        ...

Once you have the bundle ready:

pip install -r requirements.txt -t ./package
cd package && zip -r ../foo.zip . && cd ..
zip -g foo.zip app.py

You need a trust policy, to allow lambda to assume the role:

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Action": "sts:AssumeRole",
            "Principal": {
                "Service": "lambda.amazonaws.com"
            },
            "Effect": "Allow",
            "Sid": ""
        }
    ]
}

And a policy for the redshift creds:

{
    "Version": "2012-10-17",
    "Statement": [{
        "Sid": "GetClusterCredsStatement",
        "Effect": "Allow",
        "Action": [
            "redshift:GetClusterCredentials"
        ],
        "Resource": [
            "arn:aws:redshift:region:account:dbuser:cluster/db",
            "arn:aws:redshift:region:account:dbname:cluster/db"
        ]
    }]
}

In order to create an IAM role:

docker run --rm -it -v ~/.aws:/root/.aws -v $PWD:/data -w /data -e AWS_PROFILE amazon/aws-cli iam create-role --role-name role --assume-role-policy-document file://trust-policy.json
docker run --rm -it -v ~/.aws:/root/.aws -v $PWD:/data -w /data -e AWS_PROFILE amazon/aws-cli iam attach-role-policy --role-name role --policy-arn arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole
docker run --rm -it -v ~/.aws:/root/.aws -v $PWD:/data -w /data -e AWS_PROFILE amazon/aws-cli iam attach-role-policy --role-name remove-duplicates --policy-arn arn:aws:iam::aws:policy/service-role/AWSXRayDaemonWriteAccess
docker run --rm -it -v ~/.aws:/root/.aws -v $PWD:/data -w /data -e AWS_PROFILE amazon/aws-cli iam put-role-policy --role-name role --policy-name GetClusterCredentials --policy-document file://get-cluster-credentials.json

And, finally, the lambda itself:

docker run --rm -it -v ~/.aws:/root/.aws -v $PWD:/data -w /data -e AWS_PROFILE amazon/aws-cli lambda create-function --function-name foo --runtime python3.7 --zip-file fileb://foo.zip --handler app.handler --role arn:aws:iam::account:role/role --timeout 900

If you need to update the code, after:

docker run --rm -it -v ~/.aws:/root/.aws -v $PWD:/data -w /data -e AWS_PROFILE amazon/aws-cli lambda update-function-code --function-name foo --zip-file fileb://foo.zip

You can test the lambda in the console. Next time, we’ll look at how to trigger it, using EventBridge.

No module named ‘psycopg2._psycopg’

I was trying to set up a python lambda, and fell at the first hurdle:

What made it confusing was that I had copied an existing lambda, that was working fine. I checked a few things that were different: the python version (3.7), no effect. Even the name of the module/function.

I was using psycopg2-binary, and the zip file structure looked right. Eventually, I found a SO answer suggesting it could be arch related, at which point I realised that I had pip installed using docker, rather than venv.

I have no idea why that mattered (uname showed the same arch from python:3.7 as my laptop), but onwards to the next problem! 🤷

Column aliases are not supported (Redshift)

I was trying to create a materialized view recently, and got this error:

WARNING:  An incrementally maintained materialized view could not be created, 
reason: Column aliases are not supported. The materialized view created, ***, 
will be recomputed from scratch for every REFRESH.

My view definition did include some column aliases:

CREATE MATERIALIZED VIEW foo
AUTO REFRESH YES
AS
    SELECT
        trunc(date_col) date,
        platform,
        operator,
        category,
        game_name,
        count(*) as count1,
        sum(bar) as sum1,
        count(distinct baz) as count2
    FROM xxx
    GROUP BY 1, 2, 3, 4, 5;

so that did seem believable (although a little unreasonable, and not covered in the documented limitations).

I decided to split my view up, so I didn’t have multiple aggregations of the same type, and I could use the generated col names (e.g. count). I could then have a, non-materialized “super” view, to join them all back together again.

At this point, thanks to some incompetent copy pasta, I discovered that redshift would quite happily create an auto refresh view with a column alias.

Eventually, I realised that the real problem was the count(distinct), which makes much more sense. You can’t incrementally update it, without tracking all the existing values.

Side note: it is also possible to use `APPROXIMATE COUNT (DISTINCT …), with some caveats

Jenkins seed job

In the brave new world of Jenkins as Code, you can use CasC to specify an initial job (using the Job DSL):

jobs:
  - script: >
      pipelineJob('jenkins-job-dsl') {
        definition {
          cpsScm{
            scm {
              gitSCM {
                userRemoteConfigs {
                  browser {
                    githubWeb {
                      repoUrl('https://github.com/foo/bar')
                    }
                  }
                  gitTool("github")
                  userRemoteConfig {
                    credentialsId("github-creds")
                    name("")
                    refspec("")
                    url("git@github.com:foo/bar.git")
                  }
                }
                branches {
                  branchSpec { name("main") }
                }
              }
            }
            scriptPath("Jenkinsfile.seed")
          }
        }
        properties {
          pipelineTriggers {
            triggers {
              cron { spec('@daily') }
              githubPush()
            }
          }
        }
      }

using a Jenkinsfile to again call the Job DSL:

pipeline {
    agent any

    options {
        timestamps ()
        disableConcurrentBuilds()
    }

    stages {
        stage('Clean') {
            steps {
                deleteDir()
            }
        }

        stage('Checkout') {
            steps {
                checkout scm
            }
        }

        stage('Job DSL') {
            steps {
                jobDsl(
                    targets: """
                        jobs/*.groovy
                        views/*.groovy
                    """
                )
            }
        }
    }
}

and create all the jobs/views from that repo (each of which is another Jenkinsfile).

This should allow you to recreate your Jenkins instance, without any manual fiddling; and provide an audit trail of any changes.