call_end

    • Pl chevron_right

      Erlang Solutions: Change data capture with Postgres & Elixir

      news.movim.eu / PlanetJabber • 13 December, 2022 • 19 minutes

    CDC is the process of identifying and capturing data changes from the database.

    With CDC, changes to data can be tracked in near real-time, and that information can be used to support a variety of use cases, including auditing, replication, and synchronisation.


    A good example of a use case for CDC is to consider an application which inserts a record into the database and pushes an event to a message queue after the record has been inserted (write-twice).

    Imagine you’re working on an e-commerce application and after an order is created and inserted into the database, you push an OrderCreated event to a message queue. The consumers of the event might do things such as create pick orders for the warehouse, schedule transports for delivery and send an order confirmation email to the customer.

    But what if the application crashes after the order has been inserted into the database but before managing to push the event to the message queue? This is possible due to the fact that you can’t atomically insert the record AND push the message in the same transaction, so if the application crashes after inserting the record to the database but before pushing the event to the queue, the event is lost.

    There are of course workarounds to circumvent this: a simple solution is to “outbox” the event into an outbox table in the same transaction as writing the record, andhen, rely on a CDC process to capture the change to the outbox table and push the event to the message queue. The transaction is atomic and the CDC process can assure the event

    gets delivered to the message queue at-least-once.

    In order to capture changes, CDC typically uses one of two methods: log-based or trigger-based.

    Log-based CDC involves reading the transaction logs of the database to identify changed data, which is the method we’ll use here by utilising Postgres Logical Replication.

    Postgres replication

    There are two modes of replication in Postgres:

    1. Physical replication – Every change from the primary are streamed to replicas via the WAL (Write Ahead Log). This replication is performed byte-by-byte with exact block addresses.

    2. Logical replication – In logical replication the subscriber receives each individual transactional change (i.e. INSERT, UPDATE, or DELETE statements) to the database.

    The WAL is still streamed but it encodes the logical operations so that they can be decoded by the subscriber without having to know Postgres internals.

    One of the great things about logical replication is that it can be used to only replicate specific tables or rows, meaning that you have complete control over what is being replicated.

    To enable logical replication the `wal_level` need to be set:

    ```sql
    -- determines how much information is written to the wal. 
    -- Each 'level' inherits the level below it; 'logical' is the highest level
    
    ALTER SYSTEM SET wal_level=logical;
    
    -- simultaneously running WAL sender processes
    ALTER SYSTEM SET max_wal_senders='10';
    
    -- simultaneously defined replication slots
    ALTER SYSTEM SET max_replication_slots='10';
    ```
    

    The changes require a restart to the Postgres instance.

    After the system has been restarted the `wal_level` can be verified with:

    ```sql
    SHOW wal_level;
     wal_level 
    -----------
     logical
    (1 row)
    

    In order to subscribe to changes a [publication](https://www.postgresql.org/docs/current/sql-createpublication.html) must be created. A publication is a group of tables in which we would like to receive data changes for.

    Let’s create a simple table and define a publication for it:

    ```sql
    CREATE TABLE articles (id serial PRIMARY KEY, title text, description text, body text);
    CREATE PUBLICATION articles_pub FOR TABLE articles;
    ```
    

    To tell postgres to retain WAL segments we must create a [replication slot](https://www.postgresql.org/docs/current/logicaldecoding-explanation.html).

    The replication slot represents a stream of changes from one or more publications and is used to prevent data loss in the event of a server failure, as they are crash safe.


    Replication Protocol

    In order to get a feel for the protocol and messages being sent we can use [`pg_recvlogical`](https://www.postgresql.org/docs/current/app-pgrecvlogical.html) to start a replication subscriber:

    ```sh
    # Start and use the publication defined above
    # output is written to stdout
    pg_recvlogical --start \
      --host='localhost' \
      --port='5432' \
      --username='postgres' \
      --dbname='postgres' \
      --option=publication_names='articles_pub' \
      --option=proto_version=1 \
      --create-slot \
      --if-not-exists \
      --slot=articles_slot \
      --plugin=pgoutput \
      --file=-
    ```
    

    Insert a record:

    ```sql
    INSERT INTO articles (title, description, body)
        VALUES ('Postgres replication', 'Using logical replication', 'Foo bar baz');
    ```
    

    Each row in the output corresponds to a replication messages received through the subscription:

    ```
    B(egin) - Begin transaction 
    R(elation) - Table, schema, columns and their types
    I(insert) - Data being inserted
    C(ommit) - Commit transaction
    ```
    
    
    
    ```
    B
    
    Rarticlesdidtitledescriptionbody
    It35tPostgres replicationtUsing logical replicationtFoo bar baz
    C
    ```
    

    If we insert multiple records in a transaction we should have two I in between B and C:

    ```sql
    BEGIN;
    INSERT INTO articles (title, description, body) VALUES ('First', 'desc', 'Foo');
    
    INSERT INTO articles (title, description, body) VALUES ('Second', 'desc', 'Bar');
    COMMIT;
    ```
    
    And the ouput:
    ```
    C
    B
    
    It37tFirsttdesctFoo
    It38tSecondtdesctBar
    C
    ```
    
    

    The relation i.e table information was not transmitted since we already received the relation when inserting the first record.

    Postgres only sends the relation the first time it’s encountered during the session. The subscriber is expected to cache a previously sent relation.

    Now that we have a feel for how Logical replication works, let’s implement it in Elixir!

    Implementing the replication connection

    Create a new Elixir project:

    ```
    mix new cdc
    ```
    
    We'll add the following dependencies to `mix.exs`
    
    ```elixir
    defp deps do
      {:postgrex, "~> 0.16.4"},
      # decode/encode replication messages
      {:postgrex_pgoutput, "~> 0.1.0"}
    end
    ```
    
    
    `postgrex` supports replication through the [`Postgrex.ReplicationConnection`](https://hexdocs.pm/postgrex/Postgrex.ReplicationConnection.html) process.
    
    
    ```elixir
    defmodule CDC.Replication do
      use Postgrex.ReplicationConnection
      require Logger
    
      defstruct [
        :publications,
        :slot,
        :state
      ]
    
      def start_link(opts) do
        conn_opts = [auto_reconnect: true]
        publications = opts[:publications] || raise ArgumentError, message: "`:publications` is required"
        slot = opts[:slot] || raise ArgumentError, message: "`:slot` is required"
    
        Postgrex.ReplicationConnection.start_link(
          __MODULE__,
          {slot, publications},
          conn_opts ++ opts
        )
      end
    
      @impl true
      def init({slot, pubs}) do
        {:ok, %__MODULE__{slot: slot, publications: pubs}}
      end
    
      
      @impl true
      def handle_connect(%__MODULE__{slot: slot} = state) do
        query = "CREATE_REPLICATION_SLOT #{slot} TEMPORARY LOGICAL pgoutput NOEXPORT_SNAPSHOT"
    
        Logger.debug("[create slot] query=#{query}")
    
        {:query, query, %{state | state: :create_slot}}
      end
    
      @impl true
      def handle_result([%Postgrex.Result{} | _], %__MODULE__{state: :create_slot, publications: pubs, slot: slot} = state) do
        opts = [proto_version: 1, publication_names: pubs]
    
        query = "START_REPLICATION SLOT #{slot} LOGICAL 0/0 #{escape_options(opts)}"
    
        Logger.debug("[start streaming] query=#{query}")
    
        {:stream, query, [], %{state | state: :streaming}}
      end
    
      @impl true
      def handle_data(msg, state) do
        Logger.debug("Received msg=#{inspect(msg, limit: :infinity, pretty: true)}")
        {:noreply, [], state}
      end
    
      defp escape_options([]),
        do: ""
    
      defp escape_options(opts) do
        parts =
          Enum.map_intersperse(opts, ", ", fn {k, v} -> [Atom.to_string(k), ?\s, escape_string(v)] end)
    
        [?\s, ?(, parts, ?)]
      end
    
      defp escape_string(value) do
        [?', :binary.replace(to_string(value), "'", "''", [:global]), ?']
      end
    end
    ```
    

    The code is available on GitHub

    Let’s try it out:

    ```elixir
    opts = [
      slot: "articles_slot_elixir",
      publications: ["articles_pub"],
      host: "localhost",
      database: "postgres",
      username: "postgres",
      password: "postgres",
      port: 5432,
    ]
    
    CDC.Replication.start_link(opts)
    ```
    

    When we start the process the following is happening:

    1. Once we are connected to postgres the callback `handle_connect/1` is called, a temporary logical replication slot is created.

    2. `handle_result/2` is called with the result from the query in ‘1’. If the slot was created successfully we start streaming from the slot and go into streaming mode. The requested position ‘0/0’ means that Postgres picks the position.

    3. Any replication messages sent from postgres are received in the `handle_data/2` callback.

    Replication messages

    There are two types of messages a subscriber receives:

    1. `primary_keep_alive` – A checkin message, if `reply == 1` the subscriber is expected to reply to the message with a `standy_status_update` to avoid a timeout disconnect.

    The `standy_status_update` contains the current [LSN](https://www.postgresql.org/docs/current/datatype-pg-lsn.html) the subscriber has processed.

    Postgres uses this message to determine which WAL segments can be safely removed.

    2. `xlog_data` – Contains the data messages for each step in a transaction.

    Since we are not responding to the `primary_keep_alive` messages the process gets disconnected and restarts
    .

    Let’s fix it by decoding the messages and start replying with `standby_status_update` messages.

    ```elixir
    defmodule CDC.Replication do
      use Postgrex.ReplicationConnection
    
      require Postgrex.PgOutput.Messages
      alias Postgrex.PgOutput.{Messages, Lsn}
    
      require Logger
    
      defstruct [
        :publications,
        :slot,
        :state
      ]
    
      def start_link(opts) do
        conn_opts = [auto_reconnect: true]
        publications = opts[:publications] || raise ArgumentError, message: "`:publications` is required"
        slot = opts[:slot] || raise ArgumentError, message: "`:slot` is required"
    
        Postgrex.ReplicationConnection.start_link(
          __MODULE__,
          {slot, publications},
          conn_opts ++ opts
        )
      end
    
      @impl true
      def init({slot, pubs}) do
        {:ok, %__MODULE__{slot: slot, publications: pubs}}
      end
    
      @impl true
      def handle_connect(%__MODULE__{slot: slot} = state) do
        query = "CREATE_REPLICATION_SLOT #{slot} TEMPORARY LOGICAL pgoutput NOEXPORT_SNAPSHOT"
    
        Logger.debug("[create slot] query=#{query}")
    
        {:query, query, %{state | state: :create_slot}}
      end
    
      @impl true
      def handle_result(
            [%Postgrex.Result{} | _],
            %__MODULE__{state: :create_slot, publications: pubs, slot: slot} = state
          ) do
        opts = [proto_version: 1, publication_names: pubs]
    
        query = "START_REPLICATION SLOT #{slot} LOGICAL 0/0 #{escape_options(opts)}"
    
        Logger.debug("[start streaming] query=#{query}")
    
        {:stream, query, [], %{state | state: :streaming}}
      end
    
      @impl true
      def handle_data(msg, state) do
        return_msgs =
          msg
          |> Messages.decode()
          |> handle_msg()
    
        {:noreply, return_msgs, state}
      end
    
      #
      defp handle_msg(Messages.msg_primary_keep_alive(server_wal: lsn, reply: 1)) do
        Logger.debug("msg_primary_keep_alive message reply=true")
        <<lsn::64>> = Lsn.encode(lsn)
    
        [standby_status_update(lsn)]
      end
    
      defp handle_msg(Messages.msg_primary_keep_alive(reply: 0)), do: []
    
      defp handle_msg(Messages.msg_xlog_data(data: data)) do
        Logger.debug("xlog_data message: #{inspect(data, pretty: true)}")
        []
      end
    
      defp standby_status_update(lsn) do
        [
          wal_recv: lsn + 1,
          wal_flush: lsn + 1,
          wal_apply: lsn + 1,
          system_clock: Messages.now(),
          reply: 0
        ]
        |> Messages.msg_standby_status_update()
        |> Messages.encode()
      end
    
      
    defp escape_options([]),
        do: ""
    
      defp escape_options(opts) do
        parts =
          Enum.map_intersperse(opts, ", ", fn {k, v} -> [Atom.to_string(k), ?\s, escape_string(v)] end)
    
        [?\s, ?(, parts, ?)]
      end
    
      defp escape_string(value) do
        [?', :binary.replace(to_string(value), "'", "''", [:global]), ?']
      end
    end
    ```
    


    `handle_data/2` decodes the message and passes it to `handle_msg/1`.  If it’s a `primary_keep_alive` we respond with a `standby_status_update`.

    The LSN denotes a byte position in the WAL.

    The subscriber responds with the LSN it has currently handled, since we are not tracking the messages we receive, we just ack with the LSN sent from the server.

    Next we’ll handle `xlog_data` messages, the idea here is that we’ll capture each operation into a Transaction struct.

    Capturing transactions

    The `CDC.Protocol` module will handle `xlog_data` messages and track the state of the transaction.

    ```elixir
    defmodule CDC.Protocol do
      import Postgrex.PgOutput.Messages
      require Logger
    
      alias CDC.Tx
      alias Postgrex.PgOutput.Lsn
    
      
    @type t :: %__MODULE__{
              tx: Tx.t(),
              relations: map()
            }
    
      defstruct [
        :tx,
        relations: %{}
      ]
    
      @spec new() :: t()
      def new do
        %__MODULE__{}
      end
    
      def handle_message(msg, state) when is_binary(msg) do
        msg
        |> decode()
        |> handle_message(state)
      end
    
      def handle_message(msg_primary_keep_alive(reply: 0), state), do: {[], nil, state}
      def handle_message(msg_primary_keep_alive(server_wal: lsn, reply: 1), state) do
        Logger.debug("msg_primary_keep_alive message reply=true")
        <<lsn::64>> = Lsn.encode(lsn)
    
        {[standby_status_update(lsn)], nil, state}
      end
    
      def handle_message(msg, %__MODULE__{tx: nil, relations: relations} = state) do
        tx =
          [relations: relations, decode: true]
          |> Tx.new()
          |> Tx.build(msg)
    
        {[], nil, %{state | tx: tx}}
      end
    
      def handle_message(msg, %__MODULE__{tx: tx} = state) do
        case Tx.build(tx, msg) do
          %Tx{state: :commit, relations: relations} ->
            tx = Tx.finalize(tx)
            relations = Map.merge(state.relations, relations)
            {[], tx, %{state | tx: nil, relations: relations}}
    
          tx ->
            {[], nil, %{state | tx: tx}}
        end
      end
    
      defp standby_status_update(lsn) do
        [
          wal_recv: lsn + 1,
          wal_flush: lsn + 1,
          wal_apply: lsn + 1,
          system_clock: now(),
          reply: 0
        ]
        |> msg_standby_status_update()
        |> encode()
      end
    end
    ```
    `CDC.Tx` handles messages received within a transaction, begin, relation, insert/update/delete and commit.
    
    
    ```elixir
    defmodule CDC.Tx do
      import Postgrex.PgOutput.Messages
      alias Postgrex.PgOutput.Lsn
    
      alias __MODULE__.Operation
    
      @type t :: %__MODULE__{
              operations: [Operation.t()],
              relations: map(),
              timestamp: term(),
              xid: pos_integer(),
              state: :begin | :commit,
              lsn: Lsn.t(),
              end_lsn: Lsn.t()
            }
    
      defstruct [
        :timestamp,
        :xid,
        :lsn,
        :end_lsn,
        relations: %{},
        operations: [],
        state: :begin,
        decode: true
      ]
    
      def new(opts \\ []) do
        struct(__MODULE__, opts)
      end
    
      def finalize(%__MODULE__{state: :commit, operations: ops} = tx) do
        %{tx | operations: Enum.reverse(ops)}
      end
    
      def finalize(%__MODULE__{} = tx), do: tx
    
      @spec build(t(), tuple()) :: t()
      def build(tx, msg_xlog_data(data: data)) do
        build(tx, data)
      end
    
      def build(tx, msg_begin(lsn: lsn, timestamp: ts, xid: xid)) do
        %{tx | lsn: lsn, timestamp: ts, xid: xid, state: :begin}
      end
    
      def build(%__MODULE__{state: :begin, relations: relations} = tx, msg_relation(id: id) = rel) do
        %{tx | relations: Map.put(relations, id, rel)}
      end
    
      def build(%__MODULE__{state: :begin, lsn: tx_lsn} = tx, msg_commit(lsn: lsn, end_lsn: end_lsn))
          when tx_lsn == lsn do
        %{tx | state: :commit, end_lsn: end_lsn}
      end
    
      def build(%__MODULE__{state: :begin} = builder, msg_insert(relation_id: id) = msg),
        do: build_op(builder, id, msg)
    
      def build(%__MODULE__{state: :begin} = builder, msg_update(relation_id: id) = msg),
        do: build_op(builder, id, msg)
    
      def build(%__MODULE__{state: :begin} = builder, msg_delete(relation_id: id) = msg),
        do: build_op(builder, id, msg)
    
      # skip unknown messages
      def build(%__MODULE__{} = tx, _msg), do: tx
    
      defp build_op(%__MODULE__{state: :begin, relations: rels, decode: decode} = tx, id, msg) do
        rel = Map.fetch!(rels, id)
        op = Operation.from_msg(msg, rel, decode)
    
        %{tx | operations: [op | tx.operations]}
      end
    end
    ```
    
    
    `CDC.Tx.Operation` handles INSERT/UPDATE/DELETE messages and decodes the data by combining it with the relation 
    
    
    ```elixir
    defmodule CDC.Tx.Operation do
      @moduledoc "Describes a change (INSERT, UPDATE, DELETE) within a transaction."
    
      import Postgrex.PgOutput.Messages
      alias Postgrex.PgOutput.Type, as: PgType
    
      @type t :: %__MODULE__{}
      defstruct [
        :type,
        :schema,
        :namespace,
        :table,
        :record,
        :old_record,
        :timestamp
      ]
    
      @spec from_msg(tuple(), tuple(), decode :: boolean()) :: t()
      def from_msg(
            msg_insert(data: data),
            msg_relation(columns: columns, namespace: ns, name: name),
            decode?
          ) do
        %__MODULE__{
          type: :insert,
          namespace: ns,
          schema: into_schema(columns),
          table: name,
          record: cast(data, columns, decode?),
          old_record: %{}
        }
      end
    
      def from_msg(
            msg_update(change_data: data, old_data: old_data),
            msg_relation(columns: columns, namespace: ns, name: name),
            decode?
          ) do
        %__MODULE__{
          type: :update,
          namespace: ns,
          table: name,
          schema: into_schema(columns),
          record: cast(data, columns, decode?),
          old_record: cast(columns, old_data, decode?)
        }
      end
    
      def from_msg(
            msg_delete(old_data: data),
            msg_relation(columns: columns, namespace: ns, name: name),
            decode?
          ) do
        %__MODULE__{
          type: :delete,
          namespace: ns,
          schema: into_schema(columns),
          table: name,
          record: %{},
          old_record: cast(data, columns, decode?)
        }
      end
    
      defp into_schema(columns) do
        for c <- columns do
          c
          |> column()
          |> Enum.into(%{})
        end
      end
    
      defp cast(data, columns, decode?) do
        Enum.zip_reduce([data, columns], %{}, fn [text, typeinfo], acc ->
          key = column(typeinfo, :name)
    
          value =
            if decode? do
              t =
                typeinfo
                |> column(:type)
                |> PgType.type_info()
    
              PgType.decode(text, t)
            else
              text
            end
    
          Map.put(acc, key, value)
        end)
      end
    end
    ```
    

    As before, the `primary_keep_alive` message with `reply == 1` sends a `standby_status_update`. When we receive an `xlog_data` message, we create a new `%Tx{}` which we use to “build” the transaction until we receive a `msg_commit` which marks the end of the transaction.

    Any insert, update, delete messages creates an `CDC.Tx.Operation` in the transaction, each operation contains a `relation_id` which is used to look up the relation from `tx.relations`.

    The operation together with the relation enables us to decode the data. Column and type information is retrieved from the relation and is used to decode the values into elixir terms.
    .

    Once we are in a `commit` state we merge `Tx.relations` with `Protocol.relations` since a relation message will only be transmitted the first time a table is encountered during the connection session, `Protocol.relations` contains all `msg_relation` we’ve been sent during the session.

    The `CDC.Replication` module now looks like this:

    ```elixir
    defmodule CDC.Replication do
      use Postgrex.ReplicationConnection
    
      alias CDC.Protocol
    
      require Logger
    
      defstruct [
        :publications,
        :protocol,
        :slot,
        :state
      ]
    
      def start_link(opts) do
        conn_opts = [auto_reconnect: true]
        publications = opts[:publications] || raise ArgumentError, message: "`:publications` is required"
        slot = opts[:slot] || raise ArgumentError, message: "`:slot` is required"
    
        Postgrex.ReplicationConnection.start_link(
          __MODULE__,
          {slot, publications},
          conn_opts ++ opts
        )
      end
    
      @impl true
      def init({slot, pubs}) do
        {:ok,
         %__MODULE__{
           slot: slot,
           publications: pubs,
           protocol: Protocol.new()
         }}
      end
    
      @impl true
      def handle_connect(%__MODULE__{slot: slot} = state) do
        query = "CREATE_REPLICATION_SLOT #{slot} TEMPORARY LOGICAL pgoutput NOEXPORT_SNAPSHOT"
    
        Logger.debug("[create slot] query=#{query}")
    
        {:query, query, %{state | state: :create_slot}}
      end
    
      @impl true
      def handle_result(
            [%Postgrex.Result{} | _],
            %__MODULE__{state: :create_slot, publications: pubs, slot: slot} = state
          ) do
        opts = [proto_version: 1, publication_names: pubs]
    
        query = "START_REPLICATION SLOT #{slot} LOGICAL 0/0 #{escape_options(opts)}"
    
        Logger.debug("[start streaming] query=#{query}")
    
        {:stream, query, [], %{state | state: :streaming}}
      end
    
      @impl true
      def handle_data(msg, state) do
        {return_msgs, tx, protocol} = Protocol.handle_message(msg, state.protocol)
    
        if not is_nil(tx) do
          Logger.debug("Tx: #{inspect(tx, pretty: true)}")
        end
    
        {:noreply, return_msgs, %{state | protocol: protocol}}
      end
    
      
    defp escape_options([]),
        do: ""
    
      defp escape_options(opts) do
        parts =
          Enum.map_intersperse(opts, ", ", fn {k, v} -> [Atom.to_string(k), ?\s, escape_string(v)] end)
    
        [?\s, ?(, parts, ?)]
      end
    
      defp escape_string(value) do
        [?', :binary.replace(to_string(value), "'", "''", [:global]), ?']
      end
    end
    ```
    

    `handle_data/2` calls `Protocol.handle_message/1` which returns a tuple with three elements `{messages_to_send :: [binary()], complete_transaction :: CDC.Tx.t() | nil, CDC.Protocol.t()}`

    For now we just inspect the transaction when it’s emitted from `Protocol.handle_message/3`, let’s try it out:

    ```elixir
    Interactive Elixir (1.14.0) - press Ctrl+C to exit (type h() ENTER for help)
    opts = [
      slot: "articles_slot_elixir",
      publications: ["articles_pub"],
      host: "localhost",
      database: "postgres",
      username: "postgres",
      password: "postgres",
      port: 5432,
    ]
    
    {:ok, _} = CDC.Replication.start_link(opts)
    {:ok, pid} = Postgrex.start_link(opts)
    
    insert_query = """
    INSERT INTO articles (title, description, body) 
    VALUES ('Postgres replication', 'Using logical replication', 'with Elixir!')
    """
    
    _ = Postgrex.query!(pid, insert_query, [])
      
    14:03:48.020 [debug] Tx: %CDC.Tx{
      timestamp: ~U[2022-10-31 13:03:48Z],
      xid: 494,
      lsn: {0, 22981920},
      end_lsn: nil,
      relations: %{
        16386 => {:msg_relation, 16386, "public", "articles", :default,
         [
           {:column, [:key], "id", :int4, -1},
           {:column, [], "title", :text, -1},
           {:column, [], "description", :text, -1},
           {:column, [], "body", :text, -1}
         ]}
      },
      operations: [
        %CDC.Tx.Operation{
          type: :insert,
          schema: [
            %{flags: [:key], modifier: -1, name: "id", type: :int4},
            %{flags: [], modifier: -1, name: "title", type: :text},
            %{flags: [], modifier: -1, name: "description", type: :text},
            %{flags: [], modifier: -1, name: "body", type: :text}
          ],
          namespace: "public",
          table: "articles",
          record: %{
            "body" => "with Elixir!",
            "description" => "Using logical replication",
            "id" => 6,
            "title" => "Postgres replication"
          },
          old_record: %{},
          timestamp: nil
        }
      ],
      state: :begin,
      decode: true
    }
    ```
    

    Each change in the transaction is stored in `Tx.operations`, `operation.record` is the decoded row as a map.
    Finally let’s implement a way to subscribe to changes from `CDC.Replication`:

    defmodule CDC.Replication do
      use Postgrex.ReplicationConnection
    
      alias CDC.Protocol
    
      require Logger
    
      defstruct [
        :publications,
        :protocol,
        :slot,
        :state,
        subscribers: %{}
      ]
    
      def start_link(opts) do
        conn_opts = [auto_reconnect: true]
        publications = opts[:publications] || raise ArgumentError, message: "`:publications` is required"
        slot = opts[:slot] || raise ArgumentError, message: "`:slot` is required"
    
        Postgrex.ReplicationConnection.start_link(
          __MODULE__,
          {slot, publications},
          conn_opts ++ opts
        )
      end
    
      def subscribe(pid, opts \\ []) do
        Postgrex.ReplicationConnection.call(pid, :subscribe, Keyword.get(opts, :timeout, 5_000))
      end
    
      def unsubscribe(pid, ref, opts \\ []) do
        Postgrex.ReplicationConnection.call(
          pid,
          {:unsubscribe, ref},
          Keyword.get(opts, :timeout, 5_000)
        )
      end
    
      @impl true
      def init({slot, pubs}) do
        {:ok,
         %__MODULE__{
           slot: slot,
           publications: pubs,
           protocol: Protocol.new()
         }}
      end
    
      @impl true
      def handle_connect(%__MODULE__{slot: slot} = state) do
        query = "CREATE_REPLICATION_SLOT #{slot} TEMPORARY LOGICAL pgoutput NOEXPORT_SNAPSHOT"
    
        Logger.debug("[create slot] query=#{query}")
    
        {:query, query, %{state | state: :create_slot}}
      end
    
      @impl true
      def handle_result(
            [%Postgrex.Result{} | _],
            %__MODULE__{state: :create_slot, publications: pubs, slot: slot} = state
          ) do
        opts = [proto_version: 1, publication_names: pubs]
    
        query = "START_REPLICATION SLOT #{slot} LOGICAL 0/0 #{escape_options(opts)}"
    
        Logger.debug("[start streaming] query=#{query}")
    
        {:stream, query, [], %{state | state: :streaming}}
      end
    
      @impl true
      def handle_data(msg, state) do
        {return_msgs, tx, protocol} = Protocol.handle_message(msg, state.protocol)
    
        if not is_nil(tx) do
          notify(tx, state.subscribers)
        end
    
        {:noreply, return_msgs, %{state | protocol: protocol}}
      end
    
      # Replies must be sent using `reply/2`
      # https://hexdocs.pm/postgrex/Postgrex.ReplicationConnection.html#reply/2
      @impl true
      def handle_call(:subscribe, {pid, _} = from, state) do
        ref = Process.monitor(pid)
    
        state = put_in(state.subscribers[ref], pid)
    
        Postgrex.ReplicationConnection.reply(from, {:ok, ref})
    
        {:noreply, state}
      end
    
      def handle_call({:unsubscribe, ref}, from, state) do
        {reply, new_state} =
          case state.subscribers do
            %{^ref => _pid} ->
              Process.demonitor(ref, [:flush])
    
              {_, state} = pop_in(state.subscribers[ref])
              {:ok, state}
    
            _ ->
              {:error, state}
          end
    
        from && Postgrex.ReplicationConnection.reply(from, reply)
    
        {:noreply, new_state}
      end
    
      @impl true
      def handle_info({:DOWN, ref, :process, _, _}, state) do
        handle_call({:unsubscribe, ref}, nil, state)
      end
    
      defp notify(tx, subscribers) do
        for {ref, pid} <- subscribers do
          send(pid, {:notification, self(), ref, tx})
        end
    
        :ok
      end
    
      defp escape_options([]),
        do: ""
    
      defp escape_options(opts) do
        parts =
          Enum.map_intersperse(opts, ", ", fn {k, v} -> [Atom.to_string(k), ?\s, escape_string(v)] end)
    
        [?\s, ?(, parts, ?)]
      end
    
      defp escape_string(value) do
        [?', :binary.replace(to_string(value), "'", "''", [:global]), ?']
      end
    end
    

    And we can use it like this:

    ```elixir
    opts = [
      slot: "articles_slot",
      publications: ["articles_pub"],
      host: "localhost",
      database: "postgres",
      username: "postgres",
      password: "postgres",
      port: 5432,
    ]
    
    {:ok, pid} = CDC.Replication.start_link(opts)
    {:ok, pg_pid} = Postgrex.start_link(opts)
    {:ok, ref} = CDC.Replication.subscribe(pid)
    
    insert_query = """
    INSERT INTO articles (title, description, body) 
    VALUES ('Postgres replication', 'Using logical replication', 'with Elixir!')
    """
    
    _ = Postgrex.query!(pg_pid, insert_query, [])
    flush()
    
    {:notification, #PID<0.266.0>, #Reference<0.2499916608.3416784901.94813>,
     %CDC.Tx{
       timestamp: ~U[2022-10-31 13:26:35Z],
       xid: 495,
       lsn: {0, 22983536},
       end_lsn: nil,
       relations: %{
         16386 => {:msg_relation, 16386, "public", "articles", :default,
          [
            {:column, [:key], "id", :int4, -1},
            {:column, [], "title", :text, -1},
            {:column, [], "description", :text, -1},
            {:column, [], "body", :text, -1}
          ]}
       },
       operations: [
         %CDC.Tx.Operation{
           type: :insert,
           schema: [
             %{flags: [:key], modifier: -1, name: "id", type: :int4},
             %{flags: [], modifier: -1, name: "title", type: :text},
             %{flags: [], modifier: -1, name: "description", type: :text},
             %{flags: [], modifier: -1, name: "body", type: :text}
           ],
           namespace: "public",
           table: "articles",
           record: %{
             "body" => "with Elixir!",
             "description" => "Using logical replication",
             "id" => 7,
             "title" => "Postgres replication"
           },
           old_record: %{},
           timestamp: nil
         }
       ],
       state: :begin,
       decode: true
     }}
    ```
    

    Conclusion

    If you’re looking for a way to capture changes from your database with minimal changes to your existing setup, Changing Data Capture is definitely worth considering. With Elixir and postgrex we’ve implemented a mini Debezium in ~400 LOC. Full source is available here( https://github.com/drowzy/postgrex_pgoutput/tree/main/examples/cdc).

    If you need help with your Elixir implementation our world-leading team of experts is always here to help. Contact us today to find out how we can help you.

    The post Change data capture with Postgres & Elixir appeared first on Erlang Solutions .