Bulk Updates and Inserts with PostgreSQL using Composite Types

Something that often comes up in the PostgreSQLCopyHelper and PgBulkInsert issue trackers is how to do bulk updates with the libraries. Simple answer is: It's not possible with the libraries. The underlying Postgres COPY protocol only supports inserts.

In SQL Server I've always used Table-valued Parameters (TVP) to send a batch of data over the wire and perform a MERGE statement on the bulk data. So let's see how to do something similar in Postgres using composite types and Npgsql.

All code can also be found in a Gist at:

Implementation

Imagine we want to bulk insert or update measurements of a device.

We probably come up with a table like this:

CREATE TABLE sample.measurements
(
  device_id int,
  parameter_id int,
  timestamp timestamp with time zone,
  value double precision
);

We expect only a single value for a device and parameter at a given timestamp.

So let's add an index to the sample.measurements table:

DROP INDEX IF EXISTS sample.unique_measurement;

CREATE UNIQUE INDEX unique_measurement ON sample.measurements(device_id, parameter_id, timestamp);

Now we can create a Composite Type, that matches the measurements table structure:

DROP TYPE IF EXISTS "sample"."measurement_type";

CREATE TYPE "sample"."measurement_type" AS (
  device_id int,
  parameter_id int,
  timestamp timestamp with time zone,
  value double precision
);

And finally we can write a Stored Procedure, that takes a measurement_type[] as a parameter, which is the data we want to insert. We are using the UNNEST operator to expand the array to a set of rows, and then use the INSERT ... ON CONFLICT ... DO UPDATE ... syntax for atomic updates:

CREATE OR REPLACE PROCEDURE sample.insert_or_update_measurements(p_measurements sample.measurement_type[])
LANGUAGE SQL
AS $$

    INSERT INTO sample.measurements(device_id, parameter_id, timestamp, value)
    SELECT * FROM UNNEST(p_measurements)
    ON CONFLICT (device_id, parameter_id, timestamp)
    DO UPDATE SET value = EXCLUDED.value;

$$;

We are done on the Postgres side!

What's left is using Npgsql for mapping the composite type and calling the Stored Procedure:

using Npgsql;
using NUnit.Framework;
using System;
using System.Linq;
using System.Threading.Tasks;

namespace NpgsqlTypeMappings.Example
{
    /// <summary>
    /// Maps to the Postgres "measurement_type" type.
    /// </summary>
    public class Measurement
    {
        public int DeviceId { get; set; }

        public int ParameterId { get; set; }

        public DateTime Timestamp { get; set; }

        public double Value { get; set; }
    }

    public class Tests
    {
        private static readonly string ConnectionString = @"Host=localhost;Port=5432;Database=sampledb;Pooling=false;User Id=philipp;Password=test_pwd;";

        [Test]
        public async Task BulkInsertMeasurements()
        {
            var startDate = new DateTime(2013, 1, 1, 0, 0, 0, DateTimeKind.Utc);

            var measurements = Enumerable.Range(0, 1_000_000) // We start with 1_000_000 Measurements ...
                // ... transform them into fake measurements ...
                .Select(idx => new Measurement
                {
                    DeviceId = 1,
                    ParameterId = 1,
                    Timestamp = startDate.AddSeconds(idx),
                    Value = idx
                })
                // ... and finally evaluate them:
                .ToArray();

            // Create the Parameter:
            var p_measurements = new NpgsqlParameter
            {
                ParameterName = "p_measurements",
                DataTypeName = "sample.measurement_type[]",
                Value = measurements
            };

            // Configure the Mappings:
            NpgsqlConnection.GlobalTypeMapper.MapComposite<Measurement>("sample.measurement_type");

            using (var connection = new NpgsqlConnection(ConnectionString))
            {
                await connection.OpenAsync();

                // Execute the Insert or Update Function:
                using(var cmd = new NpgsqlCommand("CALL sample.insert_or_update_measurements(@p_measurements)", connection))
                {
                    cmd.Parameters.Add(p_measurements);

                    await cmd.ExecuteNonQueryAsync();
                }
            }
        }
    }
}

Conclusion

So how well does it perform?

Bulk inserting and updating 1,000,000 measurements takes something around 20 seconds on my machine. It's well within the range of what I expected. According to this article (that's all research I did) one million inserts in a single transaction takes something around 81 seconds... so I think this method does fairly well.

Do you know a better way to do bulk updates with PostgreSQL, that doesn't include staging tables?

Let's discuss it in the GitHub Gist over at:

How to contribute

One of the easiest ways to contribute is to participate in discussions. You can also contribute by submitting pull requests.

General feedback and discussions?

Do you have questions or feedback on this article? Please create an issue on the GitHub issue tracker.

Something is wrong or missing?

There may be something wrong or missing in this article. If you want to help fixing it, then please make a Pull Request to this file on GitHub.