I'm writing a custom mapping for pg_vector
extension, but I'm getting this error 08P01: insufficient data left in the message in custom mapping
whenever I try to write to the database.
I have a psql (15.1 (Debian 15.1-1.pgdg110+1))
database with an installed pg_vector
extension.
I have a test database, vectortest
, with test table items
with a single field of type vector(3)
which is more or less a float32 array.
Column | Type | Collation | Nullable | Default
-----------+-----------+-----------+----------+---------
embedding | vector(3) | | |
I know for sure that it works. I could add new records with SQL
await using (var cmd = dataSource.CreateCommand("INSERT INTO items VALUES ('[1,2,3]')"))
{
cmd.ExecuteNonQuery();
}
I have an implementation for TypeHandlerResolverFactory
and TypeHandlerResolver
, which return my custom handler PgVectorTypeHandler : NpgsqlSimpleTypeHandler<Vector>
The vector could be read and written to database as binary, it is basically (ushort, ushort, float, float, ...)
My mapping for read works great.
public override Vector Read(NpgsqlReadBuffer buf, int len, FieldDescription? fieldDescription = null)
{
var dim = buf.ReadUInt16();
var unused = buf.ReadUInt16();
var values = new float[dim];
for (var i = 0; i < dim; i++)
{
values[i] = buf.ReadSingle();
}
return new Vector(values);
}
But my mapper for write failed with 08P01: insufficient data left in the message in custom mapping
exception and FATAL: invalid frontend message type 6
on server side.
public override void Write(Vector value, NpgsqlWriteBuffer buf, NpgsqlParameter? parameter)
{
buf.WriteUInt16((ushort)value.Values.Length);
buf.WriteUInt16(0);
buf.WriteSingle(value.Values[0]);
buf.WriteSingle(value.Values[1]);
buf.WriteSingle(value.Values[2]);
// here was for-loop but I replaced it to simplify
}
This is how I execute the command
await using (var cmd = dataSource.CreateCommand("INSERT INTO items VALUES (@vec)"))
{
var vector1 = new Vector(new[] { 1f, 2f, 3f });
var par = cmd.CreateParameter();
par.ParameterName = "vec";
par.Value = vector1;
cmd.Parameters.Add(par);
//cmd.Parameters.AddWithValue(new Vector(new[] { 4f, 5f, 6f }));
cmd.ExecuteNonQuery();
}
I tried positional, named paramters - it does not matter. I'm out of ideas about where I should look and what I should check.
I misunderstood how NpgsqlTypeHandler<T>
should be implemented.
NpgsqlTypeHandler, on top of Read/Write methods, wants you to implement two non-generic methods, ValidateObjectAndGetLength
and WriteObjectWithLength
.
I didn't get why we need WriteObjectWithLength
and Write
methods, but actually, it's WriteObjectWithLength
who called by NpgsqlCommand.Write
.
internal class PgVectorTypeHandler : NpgsqlTypeHandler<Vector>
{
// WRONG IMPLEMENTATION
public override Task WriteObjectWithLength(object? value, NpgsqlWriteBuffer buf, NpgsqlLengthCache? lengthCache,
NpgsqlParameter parameter, bool async, CancellationToken cancellationToken = new CancellationToken())
{
Write(value as Vector ?? throw new InvalidOperationException("value type is invalid"), buf, lengthCache, parameter, async, cancellationToken);
return Task.CompletedTask;
}
...
}
But actually, that was not enough. My second mistake was that I didn't know you should write Int32 (4 bytes) value with how many bytes your parameter value has.
// BETTER IMPLEMENTATION
public override Task WriteObjectWithLength(object? value, NpgsqlWriteBuffer buf, NpgsqlLengthCache? lengthCache,
NpgsqlParameter parameter, bool async, CancellationToken cancellationToken = new CancellationToken())
{
var valueVector = value as Vector ?? throw new InvalidOperationException("value type is invalid");
buf.WriteInt32(ValidateAndGetLength(valueVector, ref lengthCache, parameter)); // required
Write(valueVector, buf, lengthCache, parameter, async, cancellationToken);
return Task.CompletedTask;
}
And then, I started to explore a base NpgsqlTypeHandler
class a little bit more and found out that there is an implementation that does the same thing, even better. So the best solution here
// PROPER IMPLEMENTATION
public override Task WriteObjectWithLength(object? value, NpgsqlWriteBuffer buf, NpgsqlLengthCache? lengthCache,
NpgsqlParameter? parameter, bool async, CancellationToken cancellationToken = new CancellationToken())
{
return base.WriteWithLength(value as Vector, buf, lengthCache, parameter, async, cancellationToken);
}
Same goes for length as well
public override int ValidateObjectAndGetLength(object value, ref NpgsqlLengthCache? lengthCache, NpgsqlParameter? parameter)
{
return base.ValidateAndGetLength<Vector>(value as Vector, ref lengthCache, parameter);
}
That will do some logic to write length inside and eventually call Write
method to write only value.