Using Replit Agent? Learn how to add a configured Postgres database to your apps with a single prompt
Docs/Integrations/Materialize

Replicate data to Materialize

Learn how to replicate data from Neon to Materialize

Neon's logical replication feature allows you to replicate data from your Neon Postgres database to external destinations.

Materialize is a data warehouse for operational workloads, purpose-built for low-latency applications. You can use it to process data at speeds and scales not possible in traditional databases, but without the cost, complexity, or development time of most streaming engines.

In this guide, you will learn how to stream data from your Neon Postgres database to Materialize using the Materialize PostgreSQL source.

Prerequisites

Enable logical replication

important

Enabling logical replication modifies the PostgreSQL wal_level configuration parameter, changing it from replica to logical for all databases in your Neon project. Once the wal_level setting is changed to logical, it cannot be reverted. Enabling logical replication also restarts all computes in your Neon project, meaning that active connections will be dropped and have to reconnect.

To enable logical replication in Neon:

  1. Select your project in the Neon Console.
  2. On the Neon Dashboard, select Settings.
  3. Select Logical Replication.
  4. Click Enable to enable logical replication.

You can verify that logical replication is enabled by running the following query:

SHOW wal_level;
 wal_level
-----------
 logical

Create a publication

After logical replication is enabled in Neon, the next step is to create a publication for the tables that you want to replicate to Materialize.

  1. From a psql client connected to your Neon database or from the Neon SQL Editor, set the replica identity to FULL for each table that you want to replicate to Materialize:

    ALTER TABLE <table1> REPLICA IDENTITY FULL;

    REPLICA IDENTITY FULL ensures that the replication stream includes the previous data of changed rows, in the case of UPDATE and DELETE operations. This setting allows Materialize to ingest Postgres data with minimal in-memory state.

  2. Create a publication with the tables you want to replicate:

    For specific tables:

    CREATE PUBLICATION mz_source FOR TABLE <table1>, <table2>;

    The mz_source publication will contain the set of change events generated from the specified tables and will later be used to ingest the replication stream.

    Be sure to include only the tables you need. If the publication includes additional tables, Materialize wastes resources on ingesting and then immediately discarding the data from those tables.

Create a Postgres role for replication

It is recommended that you create a dedicated Postgres role for replicating data. The role must have the REPLICATION privilege. The default Postgres role created with your Neon project and roles created using the Neon CLI, Console, or API are granted membership in the neon_superuser role, which has the required REPLICATION privilege.

The following CLI command creates a role. To view the CLI documentation for this command, see Neon CLI commands — roles

neon roles create --name replication_user

Grant schema access to your Postgres role

If your replication role does not own the schemas and tables you are replicating from, make sure to grant access. For example, the following commands grant access to all tables in the public schema to Postgres role replication_user:

GRANT USAGE ON SCHEMA public TO replication_user;
GRANT SELECT ON ALL TABLES IN SCHEMA public TO replication_user;
ALTER DEFAULT PRIVILEGES IN SCHEMA public GRANT SELECT ON TABLES TO replication_user;

Granting SELECT ON ALL TABLES IN SCHEMA instead of naming the specific tables avoids having to add privileges later if you add tables to your publication.

Allow inbound traffic

If you use Neon's IP Allow feature to limit IP addresses that can connect to Neon, you will need to allow inbound traffic from Materize IP addresses. If you are currently not limiting IP address access in Neon, you can skip this step.

  1. From a psql client connected to Materialize or from the Materialize SQL Shell, run this command to find the static egress IP addresses for the Materialize region you are running in:

    SELECT * FROM mz_egress_ips;
  2. In your Neon project, add the IPs to your IP Allow list, which you can find in your project's settings. For instructions, see Configure IP Allow.

Create an ingestion cluster

In Materialize, a cluster is an isolated environment, similar to a virtual warehouse in Snowflake. When you create a cluster, you choose the size of its compute resource allocation based on the work you need the cluster to do, whether ingesting data from a source, computing always-up-to-date query results, serving results to clients, or a combination.

In this case, you’ll create 1 new cluster containing 1 medium replica for ingesting source data from your Neon Postgres database.

From a psql client connected to Materialize or from the Materialize SQL Shell, run the CREATE CLUSTER command to create the new cluster:

CREATE CLUSTER ingest_postgres SIZE = 'medium';

Materialize recommends starting with a medium size replica or larger. This helps Materialize quickly process the initial snapshot of the tables in your publication. Once the snapshot is finished, you can right-size the cluster.

Start ingesting data

Now that you’ve configured your database network and created an ingestion cluster, you can connect Materialize to your Neon Postgres database and start ingesting data.

  1. From a psql client connected to Materialize or from the Materialize SQL Shell, use the CREATE SECRET command to securely store the password for the Postgres role you created earlier:

    CREATE SECRET pgpass AS '<PASSWORD>';

    You can access the password for your Neon Postgres role from the Connection Details widget on the Neon Dashboard.

  2. Use the CREATE CONNECTION command to create a connection object with access and authentication details for Materialize to use:

    CREATE CONNECTION pg_connection TO POSTGRES (
    HOST '<host>',
    PORT 5432,
    USER '<role_name>',
    PASSWORD SECRET pgpass,
    SSL MODE 'require',
    DATABASE '<database>'
    );

    You can find the connection details for your replication role in the Connection Details widget on the Neon Dashboard. A Neon connection string looks like this:

    postgresql://alex:AbC123dEf@ep-cool-darkness-123456.us-east-2.aws.neon.tech/dbname?sslmode=require
    • Replace <host> with your Neon hostname (e.g., ep-cool-darkness-123456.us-east-2.aws.neon.tech)
    • Replace <role_name> with the name of your Postgres role (e.g., alex)
    • Replace <database> with the name of the database containing the tables you want to replicate to Materialize (e.g., dbname)
  3. Use the CREATE SOURCE command to connect Materialize to your Neon Postgres database and start ingesting data from the publication you created earlier:

    CREATE SOURCE mz_source
    IN CLUSTER ingest_postgres
    FROM POSTGRES CONNECTION pg_connection (PUBLICATION 'mz_source')
    FOR ALL TABLES;

    Tips

    • To ingest data from specific schemas or tables in your publication, you can use FOR SCHEMAS (<schema1>,<schema2>) or FOR TABLES (<table1>, <table2>) instead of FOR ALL TABLES.
    • After creating a source, you can incorporate upstream schema changes for specific replicated tables using the ALTER SOURCE...{ADD | DROP} SUBSOURCE syntax.

Check the ingestion status

Before Materialize starts consuming a replication stream, it takes a snapshot of the tables in your publication. Until this snapshot is complete, Materialize won’t have the same view of your data as your Postgres database.

In this step, you’ll verify that the source is running and then check the status of the snapshotting process.

  1. From a psql client connected to Materialize or from the Materialize SQL Shell, use the mz_source_statuses table to check the overall status of your source:

    WITH
    source_ids AS
    (SELECT id FROM mz_sources WHERE name = 'mz_source')
    SELECT *
    FROM
    mz_internal.mz_source_statuses
        JOIN
        (
            SELECT referenced_object_id
            FROM mz_internal.mz_object_dependencies
            WHERE
            object_id IN (SELECT id FROM source_ids)
            UNION SELECT id FROM source_ids
        )
        AS sources
        ON mz_source_statuses.id = sources.referenced_object_id;

    For each subsource, make sure the status is running. If you see stalled or failed, there’s likely a configuration issue for you to fix. Check the error field for details and fix the issue before moving on. If the status of any subsource is starting for more than a few minutes, contact Materialize support.

  2. Once the source is running, use the mz_source_statistics table to check the status of the initial snapshot:

    WITH
    source_ids AS
    (SELECT id FROM mz_sources WHERE name = 'mz_source')
    SELECT sources.object_id, bool_and(snapshot_committed) AS snapshot_committed
    FROM
    mz_internal.mz_source_statistics
        JOIN
        (
            SELECT object_id, referenced_object_id
            FROM mz_internal.mz_object_dependencies
            WHERE
            object_id IN (SELECT id FROM source_ids)
            UNION SELECT id, id FROM source_ids
        )
        AS sources
        ON mz_source_statistics.id = sources.referenced_object_id
    GROUP BY sources.object_id;
    object_id | snapshot_committed
    ----------|------------------
    u144     | t
    (1 row)

    Once snapshot_commited is t, move on to the next step. Snapshotting can take between a few minutes to several hours, depending on the size of your dataset and the size of the cluster replica you chose for your ingest_postgres cluster.

Right-size the cluster

After the snapshotting phase, Materialize starts ingesting change events from the Postgres replication stream. For this work, Materialize generally performs well with an xsmall replica, so you can resize the cluster accordingly.

  1. From a psql client connected to Materialize or from the Materialize SQL Shell, use the ALTER CLUSTER command to downsize the cluster to xsmall:

    ALTER CLUSTER ingest_postgres SET (SIZE 'xsmall');

    Behind the scenes, this command adds a new xsmall replica and removes the medium replica.

  2. Use the SHOW CLUSTER REPLICAS command to check the status of the new replica:

    SHOW CLUSTER REPLICAS WHERE cluster = 'ingest_postgres';
        cluster     | replica |  size  | ready
    -----------------+---------+--------+-------
    ingest_postgres | r1      | xsmall | t
    (1 row)
  3. Going forward, you can verify that your new replica size is sufficient as follows:

    a. From a psql client connected to Materialize or from the Materialize SQL Shell, get the replication slot name associated with your Postgres source from the mz_internal.mz_postgres_sources table:

    SELECT
        d.name AS database_name,
        n.name AS schema_name,
        s.name AS source_name,
        pgs.replication_slot
    FROM
        mz_sources AS s
        JOIN mz_internal.mz_postgres_sources AS pgs ON s.id = pgs.id
        JOIN mz_schemas AS n ON n.id = s.schema_id
        JOIN mz_databases AS d ON d.id = n.database_id;

    b. From a psql client connected to your Neon database or from the Neon SQL Editor, check the replication slot lag, using the replication slot name from the previous step:

    SELECT
        pg_size_pretty(pg_current_wal_lsn() - confirmed_flush_lsn)
        AS replication_lag_bytes
    FROM pg_replication_slots
    WHERE slot_name = '<slot_name>';

    The result of this query is the amount of data your Postgres cluster must retain in its replication log because of this replication slot. Typically, this means Materialize has not yet communicated back to your Neon Postgres database that it has committed this data. A high value can indicate that the source has fallen behind and that you might need to scale up your ingestion cluster.

Next steps

With Materialize ingesting your Postgres data into durable storage, you can start exploring the data, computing real-time results that stay up-to-date as new data arrives, and serving results efficiently.

Need help?

Join our Discord Server to ask questions or see what others are doing with Neon. Users on paid plans can open a support ticket from the console. For more details, see Getting Support.

Last updated on

Was this page helpful?