Bulk SQL projections with F# and type providers
Early Summer, I had to set up an integration with an external partner. They required of us to daily provide them with a relational dataset stored in SQL Server. Most, if not all of the data was temporal, append-only by nature; think logins, financial transactions..
Since the data required largely lived in an eventstore on our end, I needed fast bulk projections. Having experimented with a few approaches, I eventually settled on projections in F# taking advantage of type providers.
Let’s say we have an event for when users watched a video and one for when users shared a video.
type Events =
| WatchedVideo of payload : WatchedVideo
| SharedVideo of payload : SharedVideo
and WatchedVideo = { UserId : string; Title : string; Timestamp : DateTime }
and SharedVideo = { UserId : string; Title : string; Timestamp : DateTime }
We want to take streams from our eventstore and project them to a specific state; a stream goes in and state comes out.
type EventStream = seq<Events>
type Projection<'state> = EventStream -> 'state
Then we want to take that state, and store it in our SQL Server database.
type StoreState<'state> = SqlConnection -> SqlTransaction -> 'state -> unit
Some infrastructure that reads a specific stream, runs the projection, stores the state and checkpoints the projection, could look like this.
let runSqlProjection input =
let checkpoint = readCheckpoint input.ProjectionName
let events = readEvents input.SourceStreamName checkpoint
let newCheckpoint = events |> Seq.map(fun x -> x.EventId) |> Seq.max
let state = events |> unwrap |> input.Projection
use conn = new SqlConnection(destinationConnectionString)
conn.Open()
use tx = conn.BeginTransaction(IsolationLevel.ReadCommitted)
input.StoreState conn tx state
writeCheckpoint input.ProjectionName newCheckpoint conn tx
tx.Commit()
To avoid data corruption, storing the state and writing the checkpoint happens in the same transaction.
With this piece of infrastructure in place, we are close to implementing an example. But before we do that, we first need to install the FSharp.Data.SqlClient package. Using this package, we can use the SqlProgrammabilityProvider type provider to provide us with types for each table in our destination database. In the snippet below, I’ll create a typed dataset for the WatchedVideos table and add a row.
type Destination = SqlProgrammabilityProvider<"Data Source=...">
// or type Destination = SqlProgrammabilityProvider<"Name=Destination">
// **magic**
let dt = new Destination.dbo.Tables.WatchedVideos()
dt.AddRow("Jef", "Tesla's New Autopilot", DateTime.UtcNow)
I haven’t defined this type, nor was it generated by me. The SqlProgrammabilityProvider type provider gives you these for free, based on the meta data it can extract from the destination database. This also means that when you change your table, without changing your code, the compiler will have no mercy and immediately feed back where you broke your code. In this usecase, where you rather rebuild your data than migrate it, the feedback loop of changing your database model becomes so short, that it allows you to break stuff with much confidence. The only caveat here is that the compiler must always be able to access that specific database, compiling without fails. In practice, this means you need to ship your source with a build script that sets up your database locally before you do any work.
Going from a stream to a dataset is quite declarative and straightforward with the help of pattern matching.
let toWatchedVideosDataset stream =
let dt = new Destination.dbo.Tables.WatchedVideos()
stream
|> Seq.iter(fun e ->
match e with
| Events.SharedVideo x -> dt.AddRow(x.UserId, x.Title, x.Timestamp)
| _ -> ())
dt
Storing the result in an efficient fashion is also simple, since the dataset directly exposes a BulkCopy method.
let storeState conn tx ( state : Destination.dbo.Tables.WatchedVideos ) =
state.BulkCopy(conn, SqlBulkCopyOptions.Default, tx)
When we put this all together, we end up with this composition.
runSqlProjection {
ProjectionName = "WatchedVideos"
SourceStreamName = "$all"
Projection = toWatchedVideosDataset
StoreState = storeState
}
Executing this program, we can see the data was persisted like expected.
In the real world, you also want to take care of batching and logging, but that isn’t too hard to implement.
Having this approach in production for some time now, I’m still quite happy with how it turned out. The implementation is fast, and the code is compact and easy to maintain.