Milvex (milvex v0.13.0)
Copy MarkdownMilvex
An Elixir client for Milvus, the open-source vector database built for scalable similarity search.
Features
- Full gRPC client with automatic reconnection and health monitoring
- Fluent builders for schemas, indexes, and data
Installation
Add milvex to your dependencies in mix.exs:
def deps do
[
{:milvex, "~> 0.4.1"}
]
endQuick Start
Connect to Milvus
# Start a connection
{:ok, conn} = Milvex.Connection.start_link(host: "localhost", port: 19530)
# Or with a named connection
{:ok, _} = Milvex.Connection.start_link([host: "localhost"], name: :milvus)Start Under a Supervisor
defmodule MyApp.Application do
use Application
def start(_type, _args) do
children = [
{Milvex.Connection, [host: "localhost", port: 19530, name: MyApp.Milvus]}
]
opts = [strategy: :one_for_one, name: MyApp.Supervisor]
Supervisor.start_link(children, opts)
end
endThen use the named connection throughout your app:
Milvex.search(MyApp.Milvus, "movies", vectors, vector_field: "embedding")Define a Schema
alias Milvex.Schema
alias Milvex.Schema.Field
schema = Schema.build!(
name: "movies",
fields: [
Field.primary_key("id", :int64, auto_id: true),
Field.varchar("title", 512),
Field.vector("embedding", 128)
],
enable_dynamic_field: true
)Create Collection and Index
alias Milvex.Index
# Create collection
:ok = Milvex.create_collection(conn, "movies", schema)
# Create an HNSW index
index = Index.hnsw("embedding", :cosine, m: 16, ef_construction: 256)
:ok = Milvex.create_index(conn, "movies", index)
# Load collection into memory for search
:ok = Milvex.load_collection(conn, "movies")Insert Data
# Insert with auto-fetched schema
{:ok, result} = Milvex.insert(conn, "movies", [
%{title: "The Matrix", embedding: vector_128d()},
%{title: "Inception", embedding: vector_128d()}
])
# result.ids contains the auto-generated IDsSearch
query_vector = [0.1, 0.2, ...] # 128-dimensional vector
{:ok, results} = Milvex.search(conn, "movies", [query_vector],
vector_field: "embedding",
top_k: 10,
output_fields: ["title"],
filter: "title like \"The%\""
)
# Access results
for hit <- results.hits do
IO.puts("#{hit.id}: #{hit.fields["title"]} (score: #{hit.score})")
endQuery by Expression
{:ok, results} = Milvex.query(conn, "movies", "id > 0",
output_fields: ["id", "title"],
limit: 100
)Connection Configuration
Milvex.Connection.start_link(
host: "localhost", # Milvus server hostname
port: 19530, # gRPC port (default: 19530, or 443 for SSL)
database: "default", # Database name
user: "root", # Username (optional)
password: "milvus", # Password (optional)
token: "api_token", # API token (alternative to user/password)
ssl: true, # Enable SSL/TLS
ssl_options: [], # SSL options for transport
timeout: 30_000 # Connection timeout in ms
)
# Or use a URI
{:ok, config} = Milvex.Config.parse_uri("https://user:pass@milvus.example.com:443/mydb")
{:ok, conn} = Milvex.Connection.start_link(config)Index Types
# HNSW - best for high recall with good performance
Index.hnsw("field", :cosine, m: 16, ef_construction: 256)
# IVF_FLAT - good balance for medium datasets
Index.ivf_flat("field", :l2, nlist: 1024)
# AUTOINDEX - let Milvus choose optimal settings
Index.autoindex("field", :ip)
# IVF_PQ - memory efficient for large datasets
Index.ivf_pq("field", :l2, nlist: 1024, m: 8, nbits: 8)
# DiskANN - for datasets that don't fit in memory
Index.diskann("field", :l2)Metric types: :l2, :ip, :cosine, :hamming, :jaccard
Partitions
# Create partition
:ok = Milvex.create_partition(conn, "movies", "movies_2024")
# Insert into partition
{:ok, _} = Milvex.insert(conn, "movies", data, partition_name: "movies_2024")
# Search specific partitions
{:ok, _} = Milvex.search(conn, "movies", vectors,
vector_field: "embedding",
partition_names: ["movies_2024", "movies_2023"]
)
# Load/release partitions
:ok = Milvex.load_partitions(conn, "movies", ["movies_2024"])
:ok = Milvex.release_partitions(conn, "movies", ["movies_2024"])Error Handling
All functions return {:ok, result} or {:error, error}. Bang variants (e.g., insert!) raise on error.
case Milvex.search(conn, "movies", vectors, vector_field: "embedding") do
{:ok, results} -> process_results(results)
{:error, %Milvex.Errors.Connection{}} -> handle_connection_error()
{:error, %Milvex.Errors.Grpc{code: code}} -> handle_grpc_error(code)
{:error, %Milvex.Errors.Invalid{field: field}} -> handle_validation_error(field)
endDevelopment
Running Tests
# Unit tests
mix test
# Integration tests (requires Docker)
mix test.integration
Regenerating Proto Files
From the milvus-proto/proto directory:
protoc --elixir_out=one_file_per_module=true,plugins=grpc:../../lib \
--elixir_opt=package_prefix=milvex \
--elixir_opt=include_docs=true *.proto
License
MIT
Summary
Types
A collection identifier - either a string name or a module using Milvex.Collection.
Query vectors for search. Either a list of vectors (positional) or a map with atom keys (keyed).
Functions
Creates a new collection with the given schema.
Creates an index on a field in a collection.
Creates a partition in a collection.
Deletes entities from a collection by filter expression.
Describes a collection and returns its metadata.
Describes an index on a collection.
Drops (deletes) a collection.
Drops an index from a collection.
Drops a partition from a collection.
Checks if a collection exists.
Checks if a partition exists in a collection.
Performs a hybrid search combining multiple ANN searches with reranking.
Inserts data into a collection.
Lists all collections in the database.
Lists all partitions in a collection.
Loads a collection into memory for querying.
Loads partitions into memory for querying.
Queries entities from a collection using a filter expression.
Streams query results lazily using Milvus's native query iterator (PK-walk).
Releases a collection from memory.
Releases partitions from memory.
Searches for similar vectors in a collection.
Streams search results lazily using Milvus's native search-iterator V2 protocol.
Upserts data into a collection.
Types
A collection identifier - either a string name or a module using Milvex.Collection.
Query vectors for search. Either a list of vectors (positional) or a map with atom keys (keyed).
Functions
@spec create_collection(GenServer.server(), String.t(), Milvex.Schema.t(), keyword()) :: :ok | {:error, Milvex.Error.t()}
Creates a new collection with the given schema.
Parameters
conn- Connection process (pid or registered name)name- Collection nameschema- The Schema struct defining the collection structureopts- Options (see below)
Options
:db_name- Database name (default: ""):shards_num- Number of shards (default: 1):consistency_level- Consistency level (default::Bounded)
Returns
:okon success{:error, error}on failure
Examples
schema = Schema.build!(
name: "movies",
fields: [
Field.primary_key("id", :int64, auto_id: true),
Field.varchar("title", 512),
Field.vector("embedding", 128)
]
)
:ok = Milvex.create_collection(conn, "movies", schema)
@spec create_index( GenServer.server(), collection_ref(), Milvex.Index.t() | String.t(), keyword() ) :: :ok | {:error, Milvex.Error.t()}
Creates an index on a field in a collection.
Can be called with either:
- An
Index.t()struct (recommended) - A field name string with options
Parameters
conn- Connection processcollection- Collection nameindex_or_field- Either aMilvex.Index.t()struct or field name stringopts- Options
Options
:db_name- Database name (default: ""):index_name- Index name (default: "", only used with field name string):index_type- Index type (only used with field name string):metric_type- Distance metric (only used with field name string):params- Additional index parameters (only used with field name string)
Returns
:okon success{:error, error}on failure
Examples
# Using Index struct (recommended)
index = Index.hnsw("embedding", :cosine, m: 16, ef_construction: 256)
:ok = Milvex.create_index(conn, "movies", index)
# Using field name and options
:ok = Milvex.create_index(conn, "movies", "embedding",
index_type: "AUTOINDEX",
metric_type: "COSINE"
)
@spec create_partition(GenServer.server(), collection_ref(), String.t(), keyword()) :: :ok | {:error, Milvex.Error.t()}
Creates a partition in a collection.
Parameters
conn- Connection processcollection- Collection namepartition_name- Name for the new partitionopts- Options
Options
:db_name- Database name (default: "")
Returns
:okon success{:error, error}on failure
Examples
:ok = Milvex.create_partition(conn, "movies", "movies_2024")
@spec delete(GenServer.server(), collection_ref(), String.t(), keyword()) :: {:ok, %{delete_count: integer()}} | {:error, Milvex.Error.t()}
Deletes entities from a collection by filter expression.
Parameters
conn- Connection processcollection- Collection nameexpr- Filter expression (e.g., "id in [1, 2, 3]" or "age > 25")opts- Options
Options
:db_name- Database name (default: ""):partition_name- Partition to delete from (default: ""):consistency_level- Consistency level (default::Bounded):expr_params- Template parameters map for the filter expression
Returns
{:ok, %{delete_count: count}}on success{:error, error}on failure
Examples
{:ok, result} = Milvex.delete(conn, "movies", "id in [1, 2, 3]")
{:ok, result} = Milvex.delete(conn, "movies", "year < {cutoff}",
expr_params: %{"cutoff" => 2000}
)
@spec describe_collection(GenServer.server(), collection_ref(), keyword()) :: {:ok, map()} | {:error, Milvex.Error.t()}
Describes a collection and returns its metadata.
Parameters
conn- Connection processcollection- Collection name or module usingMilvex.Collectionopts- Options
Options
:db_name- Database name (default: "")
Returns
{:ok, info}with collection info map containing::schema- The Schema struct:collection_id- Collection ID:shards_num- Number of shards:consistency_level- Consistency level:created_timestamp- Creation timestamp:aliases- List of aliases
{:error, error}on failure
@spec describe_index(GenServer.server(), collection_ref(), keyword()) :: {:ok, list()} | {:error, Milvex.Error.t()}
Describes an index on a collection.
Parameters
conn- Connection processcollection- Collection nameopts- Options
Options
:db_name- Database name (default: ""):field_name- Field name (default: ""):index_name- Index name (default: "")
Returns
{:ok, index_descriptions}on success{:error, error}on failure
@spec drop_collection(GenServer.server(), collection_ref(), keyword()) :: :ok | {:error, Milvex.Error.t()}
Drops (deletes) a collection.
Parameters
conn- Connection processcollection- Collection name or module usingMilvex.Collectionopts- Options
Options
:db_name- Database name (default: "")
Returns
:okon success{:error, error}on failure
@spec drop_index(GenServer.server(), collection_ref(), String.t(), keyword()) :: :ok | {:error, Milvex.Error.t()}
Drops an index from a collection.
Parameters
conn- Connection processcollection- Collection namefield_name- Field name of the indexed fieldopts- Options
Options
:db_name- Database name (default: ""):index_name- Index name to drop (default: "")
Returns
:okon success{:error, error}on failure
Examples
:ok = Milvex.drop_index(conn, "movies", "embedding")
:ok = Milvex.drop_index(conn, "movies", "embedding", index_name: "my_hnsw_index")
@spec drop_partition(GenServer.server(), collection_ref(), String.t(), keyword()) :: :ok | {:error, Milvex.Error.t()}
Drops a partition from a collection.
Parameters
conn- Connection processcollection- Collection namepartition_name- Name of the partition to dropopts- Options
Options
:db_name- Database name (default: "")
Returns
:okon success{:error, error}on failure
Examples
:ok = Milvex.drop_partition(conn, "movies", "movies_2024")
@spec has_collection(GenServer.server(), collection_ref(), keyword()) :: {:ok, boolean()} | {:error, Milvex.Error.t()}
Checks if a collection exists.
Parameters
conn- Connection processcollection- Collection name or module usingMilvex.Collectionopts- Options
Options
:db_name- Database name (default: "")
Returns
{:ok, true}if collection exists{:ok, false}if collection does not exist{:error, error}on failure
@spec has_partition(GenServer.server(), collection_ref(), String.t(), keyword()) :: {:ok, boolean()} | {:error, Milvex.Error.t()}
Checks if a partition exists in a collection.
Parameters
conn- Connection processcollection- Collection namepartition_name- Partition name to checkopts- Options
Options
:db_name- Database name (default: "")
Returns
{:ok, true}if partition exists{:ok, false}if partition does not exist{:error, error}on failure
Examples
{:ok, true} = Milvex.has_partition(conn, "movies", "movies_2024")
@spec hybrid_search( GenServer.server(), collection_ref(), [Milvex.AnnSearch.t()], Milvex.Ranker.WeightedRanker.t() | Milvex.Ranker.RRFRanker.t() | Milvex.Ranker.DecayRanker.t(), keyword() ) :: {:ok, Milvex.SearchResult.t()} | {:error, Milvex.Error.t()}
Performs a hybrid search combining multiple ANN searches with reranking.
Parameters
conn- Connection processcollection- Collection name or modulesearches- List ofAnnSearch.t()structsranker-WeightedRanker.t(),RRFRanker.t(), orDecayRanker.t()opts- Options (see below)
Options
:output_fields- List of field names to return:partition_names- Partitions to search:consistency_level- Consistency level (default::Bounded):db_name- Database name (default: ""):limit- Maximum number of final results:offset- Number of results to skip for pagination:group_by_field- Scalar field name to group results by:group_size- Number of entities per group (default 1):strict_group_size- Boolean, enforce exact group_size per group:round_decimal- Round scores to N decimal places (-1 to disable):ignore_growing- Boolean, skip growing segments during search
Examples
{:ok, search1} = AnnSearch.new("text_dense", [text_vec], limit: 10)
{:ok, search2} = AnnSearch.new("image_dense", [image_vec], limit: 10)
{:ok, ranker} = Ranker.weighted([0.7, 0.3])
{:ok, results} = Milvex.hybrid_search(conn, "products", [search1, search2], ranker,
output_fields: ["title", "price"]
)Pagination
:offset + :limit must be <= 16384 (Milvus server-side hard cap). Over the
cap returns {:error, %Milvex.Errors.Invalid{}}. There is no streaming
iterator for hybrid search — Milvus's iterator protocol does not support
sub-requests with reranking. For deeper pagination on hybrid queries,
narrow the filter, batch the rerank yourself, or post-process results from
multiple search_stream/4 runs.
@spec insert( GenServer.server(), collection_ref(), Milvex.Data.t() | [map() | struct()], keyword() ) :: {:ok, %{insert_count: integer(), ids: list()}} | {:error, Milvex.Error.t()}
Inserts data into a collection.
Data can be provided as:
- A list of row maps (auto-fetches schema from collection)
- A
Milvex.Datastruct (pre-built data)
Parameters
conn- Connection processcollection- Collection namedata- Data to insert (list of maps or Data struct)opts- Options
Options
:db_name- Database name (default: ""):partition_name- Partition to insert into (default: "")
Returns
{:ok, %{insert_count: count, ids: ids}}on success{:error, error}on failure
Examples
# Insert with auto-schema fetch
{:ok, result} = Milvex.insert(conn, "movies", [
%{title: "Movie 1", embedding: [0.1, 0.2, ...]},
%{title: "Movie 2", embedding: [0.3, 0.4, ...]}
])
# Insert with pre-built Data
{:ok, data} = Data.from_rows(rows, schema)
{:ok, result} = Milvex.insert(conn, "movies", data)
@spec list_collections( GenServer.server(), keyword() ) :: {:ok, [String.t()]} | {:error, Milvex.Error.t()}
Lists all collections in the database.
Parameters
conn- Connection processopts- Options
Options
:db_name- Database name (default: "")
Returns
{:ok, [names]}- List of collection names{:error, error}on failure
@spec list_partitions(GenServer.server(), collection_ref(), keyword()) :: {:ok, [String.t()]} | {:error, Milvex.Error.t()}
Lists all partitions in a collection.
Parameters
conn- Connection processcollection- Collection nameopts- Options
Options
:db_name- Database name (default: "")
Returns
{:ok, partition_names}- List of partition names{:error, error}on failure
Examples
{:ok, ["_default", "movies_2024"]} = Milvex.list_partitions(conn, "movies")
@spec load_collection(GenServer.server(), collection_ref(), keyword()) :: :ok | {:error, Milvex.Error.t()}
Loads a collection into memory for querying.
Parameters
conn- Connection processcollection- Collection name or module usingMilvex.Collectionopts- Options
Options
:db_name- Database name (default: ""):replica_number- Number of replicas (default: 1)
Returns
:okon success{:error, error}on failure
@spec load_partitions(GenServer.server(), collection_ref(), [String.t()], keyword()) :: :ok | {:error, Milvex.Error.t()}
Loads partitions into memory for querying.
Parameters
conn- Connection processcollection- Collection namepartition_names- List of partition names to loadopts- Options
Options
:db_name- Database name (default: ""):replica_number- Number of replicas (default: 1)
Returns
:okon success{:error, error}on failure
Examples
:ok = Milvex.load_partitions(conn, "movies", ["movies_2024", "movies_2023"])
@spec query(GenServer.server(), collection_ref(), String.t(), keyword()) :: {:ok, Milvex.QueryResult.t()} | {:error, Milvex.Error.t()}
Queries entities from a collection using a filter expression.
Parameters
conn- Connection processcollection- Collection nameexpr- Filter expression (e.g., "id > 100", "status == 'active'")opts- Options
Options
:db_name- Database name (default: ""):output_fields- List of field names to return (default: all):partition_names- List of partitions to query (default: all):limit- Maximum number of results:offset- Number of results to skip:order_by- Order results by one or more scalar fields. Accepts a field name (:price), a list of fields ([:price, :rating], all ascending), or an Ecto-style keyword list of directions ([desc: :price, asc: :rating]). Field names may be atoms or strings. Query-only — not supported bysearch/4,hybrid_search/5,search_stream/4, orquery_stream/4.:consistency_level- Consistency level (default::Bounded):expr_params- Template parameters map for the filter expression
Returns
{:ok, QueryResult.t()}on success{:error, error}on failure
Examples
{:ok, result} = Milvex.query(conn, "movies", "year > 2020",
output_fields: ["id", "title", "year"],
limit: 100
)
{:ok, result} = Milvex.query(conn, "movies", "year > 2020",
output_fields: ["id", "title", "rating"],
order_by: [desc: :rating, asc: :title],
limit: 100
)Pagination
:offset + :limit must be <= 16384 (Milvus server-side hard cap). Over the
cap returns {:error, %Milvex.Errors.Invalid{}}. For deeper walks across a
collection, use query_stream/4, which advances by primary-key cursor and
has no offset cap. See guides/pagination_and_streaming.md.
@spec query_stream(GenServer.server(), collection_ref(), String.t(), keyword()) :: Enumerable.t()
Streams query results lazily using Milvus's native query iterator (PK-walk).
Each element of the returned Stream.t() is a single row map. Errors raise from
inside the stream rather than being yielded as {:error, _} tuples, matching the
convention of File.stream!/1, IO.stream/2, and Postgrex cursors.
Use query_stream/4 for full-collection scans or filter-walks beyond the
offset + limit <= 16384 cap on query/4.
Examples
Milvex.query_stream(conn, "movies", "year > 2000",
output_fields: ["id", "title", "year"],
batch_size: 1_000
)
|> Stream.filter(&(&1["year"] > 2010))
|> Enum.count()Options
:batch_size- Rows fetched per RPC (default: 1_000, max: 16_384):limit- Total rows to emit before halting (default: unlimited):output_fields- List of field names to return:partition_names- List of partitions to query:db_name- Database name (default: ""):consistency_level- Consistency level (default::Bounded):expr_params- Template parameters map for the filter expression
:offset is rejected — the iterator advances by primary-key cursor, not offset.
@spec release_collection(GenServer.server(), collection_ref(), keyword()) :: :ok | {:error, Milvex.Error.t()}
Releases a collection from memory.
Parameters
conn- Connection processcollection- Collection name or module usingMilvex.Collectionopts- Options
Options
:db_name- Database name (default: "")
Returns
:okon success{:error, error}on failure
@spec release_partitions( GenServer.server(), collection_ref(), [String.t()], keyword() ) :: :ok | {:error, Milvex.Error.t()}
Releases partitions from memory.
Parameters
conn- Connection processcollection- Collection namepartition_names- List of partition names to releaseopts- Options
Options
:db_name- Database name (default: "")
Returns
:okon success{:error, error}on failure
Examples
:ok = Milvex.release_partitions(conn, "movies", ["movies_2024"])
@spec search(GenServer.server(), collection_ref(), vector_queries(), keyword()) :: {:ok, Milvex.SearchResult.t()} | {:error, Milvex.Error.t()}
Searches for similar vectors in a collection.
Parameters
conn- Connection processcollection- Collection namevectors- Query vectors: list of vectors or map with atom keysopts- Options (:vector_fieldis required)
Options
:vector_field- (required) Name of the vector field to search:top_k- Number of results per query (default: 10):output_fields- List of field names to include in results:filter- Filter expression string (e.g., "year > 2020"):metric_type- Similarity metric (:L2,:IP,:COSINE):search_params- Map of search parameters (e.g.,%{"nprobe" => 10}):partition_names- List of partition names to search:db_name- Database name (default: ""):consistency_level- Consistency level (default::Bounded):highlight- AMilvex.Highlighter.t()to enable search result highlighting:expr_params- Template parameters map for the filter expression:offset- Number of results to skip for pagination (limit + offset <= 16384):group_by_field- Scalar field name to group results by:group_size- Number of entities per group (default 1):strict_group_size- Boolean, enforce exact group_size per group:round_decimal- Round scores to N decimal places (-1 to disable):ignore_growing- Boolean, skip growing segments during search
Returns
{:ok, SearchResult.t()}on success{:error, error}on failure
Examples
{:ok, result} = Milvex.search(conn, "movies", [[0.1, 0.2, 0.3, ...]],
vector_field: "embedding",
top_k: 10,
output_fields: ["title", "year"]
)
{:ok, result} = Milvex.search(conn, "movies", [[0.1, 0.2, 0.3, ...]],
vector_field: "embedding",
filter: "year > {min_year} AND genre IN {genres}",
expr_params: %{"min_year" => 2020, "genres" => ["action", "sci-fi"]}
)
# Multiple queries with filter
{:ok, result} = Milvex.search(conn, "movies", [query1, query2],
vector_field: "embedding",
top_k: 5,
filter: "year > 2020"
)
# Named queries - results keyed by same atoms
{:ok, result} = Milvex.search(conn, "movies",
%{matrix_like: embedding1, inception_like: embedding2},
vector_field: "embedding",
top_k: 5
)
result.hits[:matrix_like] # => [%Hit{}, ...]
result.hits[:inception_like] # => [%Hit{}, ...]Pagination
:offset + :limit (or :offset + :top_k) must be <= 16384 (Milvus
server-side hard cap). Over the cap returns {:error, %Milvex.Errors.Invalid{}}.
For deeper paging on a single-vector search, use search_stream/4, which is
backed by Milvus's iterator V2 protocol and has no offset cap.
See guides/pagination_and_streaming.md.
@spec search_stream(GenServer.server(), collection_ref(), [number()], keyword()) :: Enumerable.t()
Streams search results lazily using Milvus's native search-iterator V2 protocol.
Each element of the returned Stream.t() is a single Milvex.SearchResult.Hit.
Errors raise from inside the stream rather than being yielded as {:error, _} tuples,
matching the convention of File.stream!/1, IO.stream/2, and Postgrex cursors.
Iterator mode requires Milvus >= 2.4 and accepts only a single query vector.
For multi-vector or named queries, use search/4 with :offset/:limit.
Examples
Milvex.search_stream(conn, "movies", [0.1, 0.2, 0.3, 0.4],
vector_field: "embedding",
filter: "year > 2020",
batch_size: 500
)
|> Stream.take(10_000)
|> Enum.to_list()Options
See module docs and the design spec for the full option matrix. Notable rejections:
:offset, :top_k, :group_by_field, :group_size, :strict_group_size all raise
Milvex.Errors.Invalid — these are server-iterator constraints, not implementation gaps.
@spec upsert( GenServer.server(), collection_ref(), Milvex.Data.t() | [map() | struct()], keyword() ) :: {:ok, %{upsert_count: integer(), ids: list()}} | {:error, Milvex.Error.t()}
Upserts data into a collection.
Works the same as insert/4 but updates existing entities with matching primary keys.
Parameters
conn- Connection processcollection- Collection namedata- Data to upsert (list of maps or Data struct)opts- Options
Options
:db_name- Database name (default: ""):partition_name- Partition to upsert into (default: ""):partial_update- Whentrue, only the fields present in each row are updated and omitted fields keep their existing values. The primary key must be present in every row, and all rows must share the same key set. Whenfalse(default), omitted fields are overwritten tonull, matching the original Milvus upsert semantics.
Returns
{:ok, %{upsert_count: count, ids: ids}}on success{:error, error}on failure