~adrien
Tech stuff!
How to make a quick’n’dirty CDC with PostgreSQL
Overview
At ${WORK}, it’s been several time that I encountered technical architectures that rely on quite powerful tools that emit all the events that happen to a Database. This tools belong to a category named Change Data Capture.
Among multiple usage, here are the three that I encountered :
- Data Offload:
- Outbox Pattern:
Multiple tools exists:
- Debezium (de facto)
What are we going to do?
I wanted to take some time to understand how exactly this kind of tools work. And what’s better for that than building our own?
Thus, we’re going to build a small Scala library which will listen to PostgreSQL changes.
For that, we’re going to rely on a few libraries: fs2, scodec and shapeless, and everything in a cats-friendly way.
But wait… How does it work?
A little bit of theory, first…
In general
Coucou dd
Postgres specific
Since v10, Postgres allow us to…
Let’s play with PostgreSQL
Start a new Database Cluster
Okay, let’s get started! First, let’s write 3 small make targets to run a PostgreSQL instance locally:
PGDATA := postgres
# https://github.com/postgres/postgres/blob/2b27273435392d1606f0ffc95d73a439a457f08e/src/backend/replication/pgoutput/pgoutput.c#L123
$(PGDATA)/PG_VERSION:
initdb \
-D "$(PGDATA)" \
-A "trust" \
-U "postgres"
.PHONY: pg-start
pg-start: $(PGDATA)/PG_VERSION
postgres \
-D "$(PGDATA)" \
-c unix_socket_directories="$(PWD)" \
-c wal_level="logical" \
-c max_replication_slots=1
A few words here:
- In order to be able to use PostgreSQL’s Logical Replication, we have to setup the
wal_leveltologicalin order to let the database export and in our case here; - We’re setting the
max_replication_slotsto1as there will be only one slot that is going to be used.
Using this, we can now start PostgreSQL by running `make pg-start.
Create a table
We’re going to create a persons table and listen to its changes. Quite easy:
CREATE TABLE
persons (
id SERIAL PRIMARY KEY,
first_name TEXT,
last_name TEXT
);
ALTER TABLE persons REPLICA IDENTITY FULL;See the REPLICA IDENTITY property? It control the behavior of the Logical Replication of the table. By using the FULL value, we say to PostgreSQL that we want it to emit the old row enirely in case of UPDATE or DELETE statements.
Insert some rows
We’re going to use a sample files (named persons.txt… Original, right?) to fill our table using the pg-populate-target:
[//]: # <<< pg-populate-target >>>Listen for changes
Okay! Now we have a running PostgreSQL instance and a fake workload which produce rows inside our persons table. We can now listen for the changes! For this, we can use the pg_recvlogical tool.
[//]: # <<< pg-populate-target >>>Great! We see that changes in the persons table are publicated and we have a way to receive them using pg_recvlogical standard output. But it’s a binary stream and we need to decode it in some way in order to use it programmatically.
Decoding pg_recvlogical standard output
Sample creation
When you take a look at pg_recvlogical’s output, you can see that there for each change, there is a new line. This is actually false (as there is no such thing as a line abstraction in binary format), but we’re going to ignore that for now
The CaptureSpec abstract class
Protocol decoding with the scodec library
Model definition
The protocol used by the logical replication is well defined in the official PostgreSQL documentation. It’s a binary protocol, so we’re going to use the scodec library to decode what’s emitted by pg_recvlogical.
First we need to define all the classes representing the binary messages:
sealed trait Message
object Message {
case class Begin(lsn: LogSequenceNumber, commitInstant: Instant, xid: TransactionID) extends Message
case class Commit(commitLSN: LogSequenceNumber, transactionEndLSN: LogSequenceNumber, commitTimestamp: Instant) extends Message
case class Insert(relationID: RelationID, tupleData: TupleData) extends Message
case class Update(relationID: RelationID, submessage: Submessage, newTupleData: TupleData) extends Message
case class Delete(relationId: RelationID, submessage: Submessage) extends Message
case class Relation(id: RelationID, namespace: RelationNamespace, name: RelationName, replicaIdentitySetting: Int, columns: List[Column]) extends Message
}Let’s ignore the RelationID, LogSequenceNumber types (as they actually are aliases for Long or Int) and focus on the TupleData one:
[//]: # <<< tuple-data-class-definition >>>
sealed trait Value
object Value {
case object Null extends Value
case class Text(value: ByteVector) extends Value
case object Toasted extends Value
}It’s in the Value type where the value lies:
- The
Value.Nullcase object represents SQL’sNULLvalue; Value.Toastedis for large columns;- Finally,
Value.Textcontains actual values (in bytes: the protocol does not express value types).
Codec implementation
Our model classes are ready, so now we need to find a way to parse the binary messages coming from pg_receival to an actual Message instance. Using scodec, we do that by defining a Codecs this way:
[//]: # <<< message-codec >>>You see that for each kind of message, we have the corresponding Codec: for example, the insert variable is a Codec[Message.Insert] defined like this:
val insert: Codec[Message.Insert] = {
("relationId" | int32) ::
constant('N') ::
("newTupleData" | tupleData)
}.as[Message.Insert]Mapping using the TupleDataReader class and the shapeless library
Quick break
Okay! Let’s do a quick summary of what we’ve done so far:
- We configured PostgreSQL to let it emit changes through its Logical Replication capability;
- We receive the changes by directly invoking the
pg_recvlogicalbinary and capturing its binary output; - We decoded the binary output to
Messageinstances by using the appropriatescodec’sCodecinstance; - In the
Message.InsertorMessage.Updateclasses, we have access to the actual values of the row through theTupleDatatype which is actually aListofValues; - Among others, a
Valuemay be aValue.Textcontaining bytes which represent the actual value stored in PostgreSQL.
So what’s remaining now is: how to convert a Message.Inserted instance to a custom case class that will the inserted row?
The ValueReader typeclass
First, we’re going to define a ValueReader[T] typeclass which will have the role of trying to convert a Value to a T.
Technically speaking, it’s a trait with a single read method that we’ll have to call to do the conversion:
[//]: # <<< value-reader-trait >>>As we said that ValueReader[T] is a typeclass, it requires some instances to work with. For our little project here, we’re only going to be able to read String, Long and Int.
Creating a ValueReader[String] is quite straighforward:
[//]: # <<< value-reader-for-string-instance >>>A ValueReader[Double] is not that much complicated:
[//]: # <<< value-reader-for-double-instance >>>Using the ValueReader[T] typeclass allow us to map a Value of a TupleData to a T, but it doesn’t allow us to map an entire TupleData to something else.
The TupleDataReader typeclass
We’re going to use the exact same strategy as above: we define a TupleDataReader[T] with a single read method which will take a TupleData instance and return an instance of T.
[//]: # <<< tuple-data-reader-trait >>>Again, as it’s a typeclass, it require some instances.
So for example, if we have a Person case class defined like this:
[//]: # <<< person-class >>>We can have a TupleDataReader[Person] instance like this:
[//]: # <<< tuple-data-reader-for-person-instance >>>But… Think about it: it’s a shame that, for all the case classes, we have to define a TupleDataReader instance! If we know in advance the case class, is there a way to derive this instance automatically?
The shapeless library to the rescue!
I think you already heard about the shapeless library. It allow a lot of stuff, but we’ll focus on the HNil one.
Thanks to Shapeless, we can express a complex type as a heterogeneous list of types. So, in terms of types, the following HPerson type and Person case class are semanticaly equivalent:
type HPerson = String :: String :: HNil
case class Person(firstName: String, lastName: String)The real power of shapeless is that it provide a bijection between the case classes world and the hetereogeneous list of types world by using the Generic[T] and its from and to method.
You may ask: “What is the point of having an heterogeneous list of types when you have case classes?” And that’s a good question!
The fact of working with lists allow you to take advantage of one of its intrinsec property: recursivity.
Embedding everything under an fs2’s Pipes
Overview
Now that we have all the classes that we have, we can embbed everything into a Stream[F[_], Change[T]] in order to build our Debezium-like library.
We’re going to do that in 3 steps:
- First, a
Stream[F[_], Byte]which will invoke...and retreive the output - Then, a
Pipe[F[_], Byte, Message]which will decode theBytestream to an actualMessagestream - And finally, a
Pipe[F[_], Message, Change[T]]which will convert theMessagestream toChange[T]using theTupleDataReaderdefined above.
Retreiving the pg_recvlogical’s output
We first have to define a small CaptureConfig case class which will store all the config value required to connect to the running PostgreSQL instance (user, host, port, etc.).
case class CaptureConfig(
user: String,
password: String,
database: String,
host: String,
port: Int,
slot: String,
publications: List[String]
)And now, using the the fs2-io extension, it’s actually quite easy to invoke the pg_receiva process and capture its output.
def receive[F[_]: Sync: ContextShift: Concurrent](config: CaptureConfig): Stream[F, Byte] = {
(for {
blocker <- Stream.resource[F, Blocker](Blocker[F])
process <- Stream.bracket[F, Process](F.delay({
new ProcessBuilder()
.command("pg_recvlogical",
"-d", s"${config.database}",
"-U", s"${config.user}",
"-h", s"${config.host}",
"-p", s"${config.port}",
s"--slot=${config.slot}",
"--status-interval=1",
"--fsync-interval=1",
"--file=-",
"--no-loop",
"-v",
"--option=proto_version=1",
s"--option=publication_names=${config.publications.mkString(",")}",
"--plugin=pgoutput",
"--create",
"--if-not-exists",
"--start")
.start()
}))({ process =>
F.delay(process.destroy())
})
} yield (blocker, process)).flatMap({ case (blocker, process) =>
readInputStream(F.delay(process.getInputStream), 512, blocker)
.concurrently(readInputStream(F.delay(process.getErrorStream), 512, blocker).through(text.utf8Decode).showLines(System.err))
})
}Not a lot of things to say, it’s actually quite straighforward.
From Stream[F, Byte]s to Stream[F, Message]
Okay, this one is a little bit tricky. Until now, we concidered each messages one by one. But pg_recvlogical is all the changes and the protocol does not define a single message size, and there is no separator to help us split a Byte stream to Message instances.
To bypass that, we’re going to try to read Byte chunks, and it may either:
- Fail because there is not enough bytes, and in that case we’re just going to read more bytes and try again;
- Succeed, which means retreiving both:
- An instance of
Messagethat we’re going to emit, - Some remaning bytes that’we going to reuse in the next iteration.
- An instance of
In order to do that, we’re going to switch from the push mode of fs2 to the pull one and implement this algorithm using Pull[F, Message, Unit] and go back to Stream[F, Message] in the end.
[//]: # <<< messages-method >>>From Stream[F, Message] to Stream[F, Change[T]]
This one is actually easy: we’re just going to keep the Message.Insert, Message.Update and Message.Delete extracting the relevent TupleData and read it as T the TupleDataReader
But first, as we still need to distinguish inserts, updates or deletes, let’s define the Change[T] sealed trait to hold the actual T instance:
[//]: # <<< change-trait >>>Now, we can define the Change.changes method that which will rely on the TupleDataReader:
[//]: # <<< changes-method >>>Assemble everything
Thanks to fs2’s expressiveness, it’s actually straighforward:
[//]: # <<< capture-method >>>And that’s it, we can now use our library with Change.capture[