Milvex (milvex v0.13.0)

Copy Markdown

Milvex

An Elixir client for Milvus, the open-source vector database built for scalable similarity search.

Features

  • Full gRPC client with automatic reconnection and health monitoring
  • Fluent builders for schemas, indexes, and data

Installation

Add milvex to your dependencies in mix.exs:

def deps do
  [
    {:milvex, "~> 0.4.1"}
  ]
end

Quick Start

Connect to Milvus

# Start a connection
{:ok, conn} = Milvex.Connection.start_link(host: "localhost", port: 19530)

# Or with a named connection
{:ok, _} = Milvex.Connection.start_link([host: "localhost"], name: :milvus)

Start Under a Supervisor

defmodule MyApp.Application do
  use Application

  def start(_type, _args) do
    children = [
      {Milvex.Connection, [host: "localhost", port: 19530, name: MyApp.Milvus]}
    ]

    opts = [strategy: :one_for_one, name: MyApp.Supervisor]
    Supervisor.start_link(children, opts)
  end
end

Then use the named connection throughout your app:

Milvex.search(MyApp.Milvus, "movies", vectors, vector_field: "embedding")

Define a Schema

alias Milvex.Schema
alias Milvex.Schema.Field

schema = Schema.build!(
  name: "movies",
  fields: [
    Field.primary_key("id", :int64, auto_id: true),
    Field.varchar("title", 512),
    Field.vector("embedding", 128)
  ],
  enable_dynamic_field: true
)

Create Collection and Index

alias Milvex.Index

# Create collection
:ok = Milvex.create_collection(conn, "movies", schema)

# Create an HNSW index
index = Index.hnsw("embedding", :cosine, m: 16, ef_construction: 256)
:ok = Milvex.create_index(conn, "movies", index)

# Load collection into memory for search
:ok = Milvex.load_collection(conn, "movies")

Insert Data

# Insert with auto-fetched schema
{:ok, result} = Milvex.insert(conn, "movies", [
  %{title: "The Matrix", embedding: vector_128d()},
  %{title: "Inception", embedding: vector_128d()}
])

# result.ids contains the auto-generated IDs
query_vector = [0.1, 0.2, ...]  # 128-dimensional vector

{:ok, results} = Milvex.search(conn, "movies", [query_vector],
  vector_field: "embedding",
  top_k: 10,
  output_fields: ["title"],
  filter: "title like \"The%\""
)

# Access results
for hit <- results.hits do
  IO.puts("#{hit.id}: #{hit.fields["title"]} (score: #{hit.score})")
end

Query by Expression

{:ok, results} = Milvex.query(conn, "movies", "id > 0",
  output_fields: ["id", "title"],
  limit: 100
)

Connection Configuration

Milvex.Connection.start_link(
  host: "localhost",        # Milvus server hostname
  port: 19530,              # gRPC port (default: 19530, or 443 for SSL)
  database: "default",      # Database name
  user: "root",             # Username (optional)
  password: "milvus",       # Password (optional)
  token: "api_token",       # API token (alternative to user/password)
  ssl: true,                # Enable SSL/TLS
  ssl_options: [],          # SSL options for transport
  timeout: 30_000           # Connection timeout in ms
)

# Or use a URI
{:ok, config} = Milvex.Config.parse_uri("https://user:pass@milvus.example.com:443/mydb")
{:ok, conn} = Milvex.Connection.start_link(config)

Index Types

# HNSW - best for high recall with good performance
Index.hnsw("field", :cosine, m: 16, ef_construction: 256)

# IVF_FLAT - good balance for medium datasets
Index.ivf_flat("field", :l2, nlist: 1024)

# AUTOINDEX - let Milvus choose optimal settings
Index.autoindex("field", :ip)

# IVF_PQ - memory efficient for large datasets
Index.ivf_pq("field", :l2, nlist: 1024, m: 8, nbits: 8)

# DiskANN - for datasets that don't fit in memory
Index.diskann("field", :l2)

Metric types: :l2, :ip, :cosine, :hamming, :jaccard

Partitions

# Create partition
:ok = Milvex.create_partition(conn, "movies", "movies_2024")

# Insert into partition
{:ok, _} = Milvex.insert(conn, "movies", data, partition_name: "movies_2024")

# Search specific partitions
{:ok, _} = Milvex.search(conn, "movies", vectors,
  vector_field: "embedding",
  partition_names: ["movies_2024", "movies_2023"]
)

# Load/release partitions
:ok = Milvex.load_partitions(conn, "movies", ["movies_2024"])
:ok = Milvex.release_partitions(conn, "movies", ["movies_2024"])

Error Handling

All functions return {:ok, result} or {:error, error}. Bang variants (e.g., insert!) raise on error.

case Milvex.search(conn, "movies", vectors, vector_field: "embedding") do
  {:ok, results} -> process_results(results)
  {:error, %Milvex.Errors.Connection{}} -> handle_connection_error()
  {:error, %Milvex.Errors.Grpc{code: code}} -> handle_grpc_error(code)
  {:error, %Milvex.Errors.Invalid{field: field}} -> handle_validation_error(field)
end

Development

Running Tests

# Unit tests
mix test

# Integration tests (requires Docker)
mix test.integration

Regenerating Proto Files

From the milvus-proto/proto directory:

protoc --elixir_out=one_file_per_module=true,plugins=grpc:../../lib \
       --elixir_opt=package_prefix=milvex \
       --elixir_opt=include_docs=true *.proto

License

MIT

Summary

Types

A collection identifier - either a string name or a module using Milvex.Collection.

Query vectors for search. Either a list of vectors (positional) or a map with atom keys (keyed).

Functions

Creates a new collection with the given schema.

Creates an index on a field in a collection.

Deletes entities from a collection by filter expression.

Describes a collection and returns its metadata.

Describes an index on a collection.

Drops (deletes) a collection.

Drops an index from a collection.

Checks if a collection exists.

Checks if a partition exists in a collection.

Performs a hybrid search combining multiple ANN searches with reranking.

Inserts data into a collection.

Lists all collections in the database.

Lists all partitions in a collection.

Loads a collection into memory for querying.

Loads partitions into memory for querying.

Queries entities from a collection using a filter expression.

Streams query results lazily using Milvus's native query iterator (PK-walk).

Releases a collection from memory.

Searches for similar vectors in a collection.

Streams search results lazily using Milvus's native search-iterator V2 protocol.

Upserts data into a collection.

Types

collection_ref()

@type collection_ref() :: String.t() | module()

A collection identifier - either a string name or a module using Milvex.Collection.

vector_queries()

@type vector_queries() :: [[number()]] | %{required(atom()) => [number()]}

Query vectors for search. Either a list of vectors (positional) or a map with atom keys (keyed).

Functions

create_collection(conn, name, schema, opts \\ [])

@spec create_collection(GenServer.server(), String.t(), Milvex.Schema.t(), keyword()) ::
  :ok | {:error, Milvex.Error.t()}

Creates a new collection with the given schema.

Parameters

  • conn - Connection process (pid or registered name)
  • name - Collection name
  • schema - The Schema struct defining the collection structure
  • opts - Options (see below)

Options

  • :db_name - Database name (default: "")
  • :shards_num - Number of shards (default: 1)
  • :consistency_level - Consistency level (default: :Bounded)

Returns

  • :ok on success
  • {:error, error} on failure

Examples

schema = Schema.build!(
  name: "movies",
  fields: [
    Field.primary_key("id", :int64, auto_id: true),
    Field.varchar("title", 512),
    Field.vector("embedding", 128)
  ]
)

:ok = Milvex.create_collection(conn, "movies", schema)

create_index(conn, collection, index_or_field, opts \\ [])

@spec create_index(
  GenServer.server(),
  collection_ref(),
  Milvex.Index.t() | String.t(),
  keyword()
) ::
  :ok | {:error, Milvex.Error.t()}

Creates an index on a field in a collection.

Can be called with either:

  • An Index.t() struct (recommended)
  • A field name string with options

Parameters

  • conn - Connection process
  • collection - Collection name
  • index_or_field - Either a Milvex.Index.t() struct or field name string
  • opts - Options

Options

  • :db_name - Database name (default: "")
  • :index_name - Index name (default: "", only used with field name string)
  • :index_type - Index type (only used with field name string)
  • :metric_type - Distance metric (only used with field name string)
  • :params - Additional index parameters (only used with field name string)

Returns

  • :ok on success
  • {:error, error} on failure

Examples

# Using Index struct (recommended)
index = Index.hnsw("embedding", :cosine, m: 16, ef_construction: 256)
:ok = Milvex.create_index(conn, "movies", index)

# Using field name and options
:ok = Milvex.create_index(conn, "movies", "embedding",
  index_type: "AUTOINDEX",
  metric_type: "COSINE"
)

create_partition(conn, collection, partition_name, opts \\ [])

@spec create_partition(GenServer.server(), collection_ref(), String.t(), keyword()) ::
  :ok | {:error, Milvex.Error.t()}

Creates a partition in a collection.

Parameters

  • conn - Connection process
  • collection - Collection name
  • partition_name - Name for the new partition
  • opts - Options

Options

  • :db_name - Database name (default: "")

Returns

  • :ok on success
  • {:error, error} on failure

Examples

:ok = Milvex.create_partition(conn, "movies", "movies_2024")

delete(conn, collection, expr, opts \\ [])

@spec delete(GenServer.server(), collection_ref(), String.t(), keyword()) ::
  {:ok, %{delete_count: integer()}} | {:error, Milvex.Error.t()}

Deletes entities from a collection by filter expression.

Parameters

  • conn - Connection process
  • collection - Collection name
  • expr - Filter expression (e.g., "id in [1, 2, 3]" or "age > 25")
  • opts - Options

Options

  • :db_name - Database name (default: "")
  • :partition_name - Partition to delete from (default: "")
  • :consistency_level - Consistency level (default: :Bounded)
  • :expr_params - Template parameters map for the filter expression

Returns

  • {:ok, %{delete_count: count}} on success
  • {:error, error} on failure

Examples

{:ok, result} = Milvex.delete(conn, "movies", "id in [1, 2, 3]")

{:ok, result} = Milvex.delete(conn, "movies", "year < {cutoff}",
  expr_params: %{"cutoff" => 2000}
)

describe_collection(conn, collection, opts \\ [])

@spec describe_collection(GenServer.server(), collection_ref(), keyword()) ::
  {:ok, map()} | {:error, Milvex.Error.t()}

Describes a collection and returns its metadata.

Parameters

  • conn - Connection process
  • collection - Collection name or module using Milvex.Collection
  • opts - Options

Options

  • :db_name - Database name (default: "")

Returns

  • {:ok, info} with collection info map containing:
    • :schema - The Schema struct
    • :collection_id - Collection ID
    • :shards_num - Number of shards
    • :consistency_level - Consistency level
    • :created_timestamp - Creation timestamp
    • :aliases - List of aliases
  • {:error, error} on failure

describe_index(conn, collection, opts \\ [])

@spec describe_index(GenServer.server(), collection_ref(), keyword()) ::
  {:ok, list()} | {:error, Milvex.Error.t()}

Describes an index on a collection.

Parameters

  • conn - Connection process
  • collection - Collection name
  • opts - Options

Options

  • :db_name - Database name (default: "")
  • :field_name - Field name (default: "")
  • :index_name - Index name (default: "")

Returns

  • {:ok, index_descriptions} on success
  • {:error, error} on failure

drop_collection(conn, collection, opts \\ [])

@spec drop_collection(GenServer.server(), collection_ref(), keyword()) ::
  :ok | {:error, Milvex.Error.t()}

Drops (deletes) a collection.

Parameters

  • conn - Connection process
  • collection - Collection name or module using Milvex.Collection
  • opts - Options

Options

  • :db_name - Database name (default: "")

Returns

  • :ok on success
  • {:error, error} on failure

drop_index(conn, collection, field_name, opts \\ [])

@spec drop_index(GenServer.server(), collection_ref(), String.t(), keyword()) ::
  :ok | {:error, Milvex.Error.t()}

Drops an index from a collection.

Parameters

  • conn - Connection process
  • collection - Collection name
  • field_name - Field name of the indexed field
  • opts - Options

Options

  • :db_name - Database name (default: "")
  • :index_name - Index name to drop (default: "")

Returns

  • :ok on success
  • {:error, error} on failure

Examples

:ok = Milvex.drop_index(conn, "movies", "embedding")
:ok = Milvex.drop_index(conn, "movies", "embedding", index_name: "my_hnsw_index")

drop_partition(conn, collection, partition_name, opts \\ [])

@spec drop_partition(GenServer.server(), collection_ref(), String.t(), keyword()) ::
  :ok | {:error, Milvex.Error.t()}

Drops a partition from a collection.

Parameters

  • conn - Connection process
  • collection - Collection name
  • partition_name - Name of the partition to drop
  • opts - Options

Options

  • :db_name - Database name (default: "")

Returns

  • :ok on success
  • {:error, error} on failure

Examples

:ok = Milvex.drop_partition(conn, "movies", "movies_2024")

has_collection(conn, collection, opts \\ [])

@spec has_collection(GenServer.server(), collection_ref(), keyword()) ::
  {:ok, boolean()} | {:error, Milvex.Error.t()}

Checks if a collection exists.

Parameters

  • conn - Connection process
  • collection - Collection name or module using Milvex.Collection
  • opts - Options

Options

  • :db_name - Database name (default: "")

Returns

  • {:ok, true} if collection exists
  • {:ok, false} if collection does not exist
  • {:error, error} on failure

has_partition(conn, collection, partition_name, opts \\ [])

@spec has_partition(GenServer.server(), collection_ref(), String.t(), keyword()) ::
  {:ok, boolean()} | {:error, Milvex.Error.t()}

Checks if a partition exists in a collection.

Parameters

  • conn - Connection process
  • collection - Collection name
  • partition_name - Partition name to check
  • opts - Options

Options

  • :db_name - Database name (default: "")

Returns

  • {:ok, true} if partition exists
  • {:ok, false} if partition does not exist
  • {:error, error} on failure

Examples

{:ok, true} = Milvex.has_partition(conn, "movies", "movies_2024")

hybrid_search(conn, collection, searches, ranker, opts \\ [])

Performs a hybrid search combining multiple ANN searches with reranking.

Parameters

  • conn - Connection process
  • collection - Collection name or module
  • searches - List of AnnSearch.t() structs
  • ranker - WeightedRanker.t(), RRFRanker.t(), or DecayRanker.t()
  • opts - Options (see below)

Options

  • :output_fields - List of field names to return
  • :partition_names - Partitions to search
  • :consistency_level - Consistency level (default: :Bounded)
  • :db_name - Database name (default: "")
  • :limit - Maximum number of final results
  • :offset - Number of results to skip for pagination
  • :group_by_field - Scalar field name to group results by
  • :group_size - Number of entities per group (default 1)
  • :strict_group_size - Boolean, enforce exact group_size per group
  • :round_decimal - Round scores to N decimal places (-1 to disable)
  • :ignore_growing - Boolean, skip growing segments during search

Examples

{:ok, search1} = AnnSearch.new("text_dense", [text_vec], limit: 10)
{:ok, search2} = AnnSearch.new("image_dense", [image_vec], limit: 10)
{:ok, ranker} = Ranker.weighted([0.7, 0.3])

{:ok, results} = Milvex.hybrid_search(conn, "products", [search1, search2], ranker,
  output_fields: ["title", "price"]
)

Pagination

:offset + :limit must be <= 16384 (Milvus server-side hard cap). Over the cap returns {:error, %Milvex.Errors.Invalid{}}. There is no streaming iterator for hybrid search — Milvus's iterator protocol does not support sub-requests with reranking. For deeper pagination on hybrid queries, narrow the filter, batch the rerank yourself, or post-process results from multiple search_stream/4 runs.

insert(conn, collection, data, opts \\ [])

@spec insert(
  GenServer.server(),
  collection_ref(),
  Milvex.Data.t() | [map() | struct()],
  keyword()
) ::
  {:ok, %{insert_count: integer(), ids: list()}} | {:error, Milvex.Error.t()}

Inserts data into a collection.

Data can be provided as:

  • A list of row maps (auto-fetches schema from collection)
  • A Milvex.Data struct (pre-built data)

Parameters

  • conn - Connection process
  • collection - Collection name
  • data - Data to insert (list of maps or Data struct)
  • opts - Options

Options

  • :db_name - Database name (default: "")
  • :partition_name - Partition to insert into (default: "")

Returns

  • {:ok, %{insert_count: count, ids: ids}} on success
  • {:error, error} on failure

Examples

# Insert with auto-schema fetch
{:ok, result} = Milvex.insert(conn, "movies", [
  %{title: "Movie 1", embedding: [0.1, 0.2, ...]},
  %{title: "Movie 2", embedding: [0.3, 0.4, ...]}
])

# Insert with pre-built Data
{:ok, data} = Data.from_rows(rows, schema)
{:ok, result} = Milvex.insert(conn, "movies", data)

list_collections(conn, opts \\ [])

@spec list_collections(
  GenServer.server(),
  keyword()
) :: {:ok, [String.t()]} | {:error, Milvex.Error.t()}

Lists all collections in the database.

Parameters

  • conn - Connection process
  • opts - Options

Options

  • :db_name - Database name (default: "")

Returns

  • {:ok, [names]} - List of collection names
  • {:error, error} on failure

list_partitions(conn, collection, opts \\ [])

@spec list_partitions(GenServer.server(), collection_ref(), keyword()) ::
  {:ok, [String.t()]} | {:error, Milvex.Error.t()}

Lists all partitions in a collection.

Parameters

  • conn - Connection process
  • collection - Collection name
  • opts - Options

Options

  • :db_name - Database name (default: "")

Returns

  • {:ok, partition_names} - List of partition names
  • {:error, error} on failure

Examples

{:ok, ["_default", "movies_2024"]} = Milvex.list_partitions(conn, "movies")

load_collection(conn, collection, opts \\ [])

@spec load_collection(GenServer.server(), collection_ref(), keyword()) ::
  :ok | {:error, Milvex.Error.t()}

Loads a collection into memory for querying.

Parameters

  • conn - Connection process
  • collection - Collection name or module using Milvex.Collection
  • opts - Options

Options

  • :db_name - Database name (default: "")
  • :replica_number - Number of replicas (default: 1)

Returns

  • :ok on success
  • {:error, error} on failure

load_partitions(conn, collection, partition_names, opts \\ [])

@spec load_partitions(GenServer.server(), collection_ref(), [String.t()], keyword()) ::
  :ok | {:error, Milvex.Error.t()}

Loads partitions into memory for querying.

Parameters

  • conn - Connection process
  • collection - Collection name
  • partition_names - List of partition names to load
  • opts - Options

Options

  • :db_name - Database name (default: "")
  • :replica_number - Number of replicas (default: 1)

Returns

  • :ok on success
  • {:error, error} on failure

Examples

:ok = Milvex.load_partitions(conn, "movies", ["movies_2024", "movies_2023"])

query(conn, collection, expr, opts \\ [])

@spec query(GenServer.server(), collection_ref(), String.t(), keyword()) ::
  {:ok, Milvex.QueryResult.t()} | {:error, Milvex.Error.t()}

Queries entities from a collection using a filter expression.

Parameters

  • conn - Connection process
  • collection - Collection name
  • expr - Filter expression (e.g., "id > 100", "status == 'active'")
  • opts - Options

Options

  • :db_name - Database name (default: "")
  • :output_fields - List of field names to return (default: all)
  • :partition_names - List of partitions to query (default: all)
  • :limit - Maximum number of results
  • :offset - Number of results to skip
  • :order_by - Order results by one or more scalar fields. Accepts a field name (:price), a list of fields ([:price, :rating], all ascending), or an Ecto-style keyword list of directions ([desc: :price, asc: :rating]). Field names may be atoms or strings. Query-only — not supported by search/4, hybrid_search/5, search_stream/4, or query_stream/4.
  • :consistency_level - Consistency level (default: :Bounded)
  • :expr_params - Template parameters map for the filter expression

Returns

  • {:ok, QueryResult.t()} on success
  • {:error, error} on failure

Examples

{:ok, result} = Milvex.query(conn, "movies", "year > 2020",
  output_fields: ["id", "title", "year"],
  limit: 100
)

{:ok, result} = Milvex.query(conn, "movies", "year > 2020",
  output_fields: ["id", "title", "rating"],
  order_by: [desc: :rating, asc: :title],
  limit: 100
)

Pagination

:offset + :limit must be <= 16384 (Milvus server-side hard cap). Over the cap returns {:error, %Milvex.Errors.Invalid{}}. For deeper walks across a collection, use query_stream/4, which advances by primary-key cursor and has no offset cap. See guides/pagination_and_streaming.md.

query_stream(conn, collection, expr, opts \\ [])

@spec query_stream(GenServer.server(), collection_ref(), String.t(), keyword()) ::
  Enumerable.t()

Streams query results lazily using Milvus's native query iterator (PK-walk).

Each element of the returned Stream.t() is a single row map. Errors raise from inside the stream rather than being yielded as {:error, _} tuples, matching the convention of File.stream!/1, IO.stream/2, and Postgrex cursors.

Use query_stream/4 for full-collection scans or filter-walks beyond the offset + limit <= 16384 cap on query/4.

Examples

Milvex.query_stream(conn, "movies", "year > 2000",
  output_fields: ["id", "title", "year"],
  batch_size: 1_000
)
|> Stream.filter(&(&1["year"] > 2010))
|> Enum.count()

Options

  • :batch_size - Rows fetched per RPC (default: 1_000, max: 16_384)
  • :limit - Total rows to emit before halting (default: unlimited)
  • :output_fields - List of field names to return
  • :partition_names - List of partitions to query
  • :db_name - Database name (default: "")
  • :consistency_level - Consistency level (default: :Bounded)
  • :expr_params - Template parameters map for the filter expression

:offset is rejected — the iterator advances by primary-key cursor, not offset.

release_collection(conn, collection, opts \\ [])

@spec release_collection(GenServer.server(), collection_ref(), keyword()) ::
  :ok | {:error, Milvex.Error.t()}

Releases a collection from memory.

Parameters

  • conn - Connection process
  • collection - Collection name or module using Milvex.Collection
  • opts - Options

Options

  • :db_name - Database name (default: "")

Returns

  • :ok on success
  • {:error, error} on failure

release_partitions(conn, collection, partition_names, opts \\ [])

@spec release_partitions(
  GenServer.server(),
  collection_ref(),
  [String.t()],
  keyword()
) ::
  :ok | {:error, Milvex.Error.t()}

Releases partitions from memory.

Parameters

  • conn - Connection process
  • collection - Collection name
  • partition_names - List of partition names to release
  • opts - Options

Options

  • :db_name - Database name (default: "")

Returns

  • :ok on success
  • {:error, error} on failure

Examples

:ok = Milvex.release_partitions(conn, "movies", ["movies_2024"])

search(conn, collection, vectors, opts \\ [])

@spec search(GenServer.server(), collection_ref(), vector_queries(), keyword()) ::
  {:ok, Milvex.SearchResult.t()} | {:error, Milvex.Error.t()}

Searches for similar vectors in a collection.

Parameters

  • conn - Connection process
  • collection - Collection name
  • vectors - Query vectors: list of vectors or map with atom keys
  • opts - Options (:vector_field is required)

Options

  • :vector_field - (required) Name of the vector field to search
  • :top_k - Number of results per query (default: 10)
  • :output_fields - List of field names to include in results
  • :filter - Filter expression string (e.g., "year > 2020")
  • :metric_type - Similarity metric (:L2, :IP, :COSINE)
  • :search_params - Map of search parameters (e.g., %{"nprobe" => 10})
  • :partition_names - List of partition names to search
  • :db_name - Database name (default: "")
  • :consistency_level - Consistency level (default: :Bounded)
  • :highlight - A Milvex.Highlighter.t() to enable search result highlighting
  • :expr_params - Template parameters map for the filter expression
  • :offset - Number of results to skip for pagination (limit + offset <= 16384)
  • :group_by_field - Scalar field name to group results by
  • :group_size - Number of entities per group (default 1)
  • :strict_group_size - Boolean, enforce exact group_size per group
  • :round_decimal - Round scores to N decimal places (-1 to disable)
  • :ignore_growing - Boolean, skip growing segments during search

Returns

  • {:ok, SearchResult.t()} on success
  • {:error, error} on failure

Examples

{:ok, result} = Milvex.search(conn, "movies", [[0.1, 0.2, 0.3, ...]],
  vector_field: "embedding",
  top_k: 10,
  output_fields: ["title", "year"]
)

{:ok, result} = Milvex.search(conn, "movies", [[0.1, 0.2, 0.3, ...]],
  vector_field: "embedding",
  filter: "year > {min_year} AND genre IN {genres}",
  expr_params: %{"min_year" => 2020, "genres" => ["action", "sci-fi"]}
)

# Multiple queries with filter
{:ok, result} = Milvex.search(conn, "movies", [query1, query2],
  vector_field: "embedding",
  top_k: 5,
  filter: "year > 2020"
)

# Named queries - results keyed by same atoms
{:ok, result} = Milvex.search(conn, "movies",
  %{matrix_like: embedding1, inception_like: embedding2},
  vector_field: "embedding",
  top_k: 5
)
result.hits[:matrix_like]     # => [%Hit{}, ...]
result.hits[:inception_like]  # => [%Hit{}, ...]

Pagination

:offset + :limit (or :offset + :top_k) must be <= 16384 (Milvus server-side hard cap). Over the cap returns {:error, %Milvex.Errors.Invalid{}}. For deeper paging on a single-vector search, use search_stream/4, which is backed by Milvus's iterator V2 protocol and has no offset cap. See guides/pagination_and_streaming.md.

search_stream(conn, collection, vector, opts \\ [])

@spec search_stream(GenServer.server(), collection_ref(), [number()], keyword()) ::
  Enumerable.t()

Streams search results lazily using Milvus's native search-iterator V2 protocol.

Each element of the returned Stream.t() is a single Milvex.SearchResult.Hit. Errors raise from inside the stream rather than being yielded as {:error, _} tuples, matching the convention of File.stream!/1, IO.stream/2, and Postgrex cursors.

Iterator mode requires Milvus >= 2.4 and accepts only a single query vector. For multi-vector or named queries, use search/4 with :offset/:limit.

Examples

Milvex.search_stream(conn, "movies", [0.1, 0.2, 0.3, 0.4],
  vector_field: "embedding",
  filter: "year > 2020",
  batch_size: 500
)
|> Stream.take(10_000)
|> Enum.to_list()

Options

See module docs and the design spec for the full option matrix. Notable rejections: :offset, :top_k, :group_by_field, :group_size, :strict_group_size all raise Milvex.Errors.Invalid — these are server-iterator constraints, not implementation gaps.

upsert(conn, collection, data, opts \\ [])

@spec upsert(
  GenServer.server(),
  collection_ref(),
  Milvex.Data.t() | [map() | struct()],
  keyword()
) ::
  {:ok, %{upsert_count: integer(), ids: list()}} | {:error, Milvex.Error.t()}

Upserts data into a collection.

Works the same as insert/4 but updates existing entities with matching primary keys.

Parameters

  • conn - Connection process
  • collection - Collection name
  • data - Data to upsert (list of maps or Data struct)
  • opts - Options

Options

  • :db_name - Database name (default: "")
  • :partition_name - Partition to upsert into (default: "")
  • :partial_update - When true, only the fields present in each row are updated and omitted fields keep their existing values. The primary key must be present in every row, and all rows must share the same key set. When false (default), omitted fields are overwritten to null, matching the original Milvus upsert semantics.

Returns

  • {:ok, %{upsert_count: count, ids: ids}} on success
  • {:error, error} on failure