diff --git a/app/jobs/scheduled/embeddings_backfill.rb b/app/jobs/scheduled/embeddings_backfill.rb index 28b7c25a..3300b479 100644 --- a/app/jobs/scheduled/embeddings_backfill.rb +++ b/app/jobs/scheduled/embeddings_backfill.rb @@ -35,8 +35,6 @@ module Jobs rebaked += populate_topic_embeddings(vector_rep, topics) - vector_rep.consider_indexing - return if rebaked >= limit # Then, we'll try to backfill embeddings for topics that have outdated @@ -82,8 +80,6 @@ module Jobs rebaked += 1 end - vector_rep.consider_indexing - return if rebaked >= limit # Then, we'll try to backfill embeddings for posts that have outdated diff --git a/db/migrate/20240611170905_move_embeddings_to_single_table_per_type.rb b/db/migrate/20240611170905_move_embeddings_to_single_table_per_type.rb index 55563763..a21a24b8 100644 --- a/db/migrate/20240611170905_move_embeddings_to_single_table_per_type.rb +++ b/db/migrate/20240611170905_move_embeddings_to_single_table_per_type.rb @@ -150,7 +150,6 @@ class MoveEmbeddingsToSingleTablePerType < ActiveRecord::Migration[7.0] strategy = DiscourseAi::Embeddings::Strategies::Truncation.new vector_rep = DiscourseAi::Embeddings::VectorRepresentations::Base.current_representation(strategy) - vector_rep.consider_indexing rescue StandardError => e Rails.logger.error("Failed to index embeddings: #{e}") end diff --git a/db/migrate/20241008054440_create_binary_indexes_for_embeddings.rb b/db/migrate/20241008054440_create_binary_indexes_for_embeddings.rb new file mode 100644 index 00000000..aa31a4e3 --- /dev/null +++ b/db/migrate/20241008054440_create_binary_indexes_for_embeddings.rb @@ -0,0 +1,27 @@ +# frozen_string_literal: true + +class CreateBinaryIndexesForEmbeddings < ActiveRecord::Migration[7.1] + def up + %w[topic post document_fragment].each do |type| + # our supported embeddings models IDs and dimensions + [ + [1, 768], + [2, 1536], + [3, 1024], + [4, 1024], + [5, 768], + [6, 1536], + [7, 2000], + [8, 1024], + ].each { |model_id, dimensions| execute <<-SQL } + CREATE INDEX ai_#{type}_embeddings_#{model_id}_1_search_bit ON ai_#{type}_embeddings + USING hnsw ((binary_quantize(embeddings)::bit(#{dimensions})) bit_hamming_ops) + WHERE model_id = #{model_id} AND strategy_id = 1; + SQL + end + end + + def down + raise ActiveRecord::IrreversibleMigration + end +end diff --git a/db/post_migrate/20241008055831_drop_old_embeddings_indexes.rb b/db/post_migrate/20241008055831_drop_old_embeddings_indexes.rb new file mode 100644 index 00000000..347d2b59 --- /dev/null +++ b/db/post_migrate/20241008055831_drop_old_embeddings_indexes.rb @@ -0,0 +1,37 @@ +# frozen_string_literal: true +class DropOldEmbeddingsIndexes < ActiveRecord::Migration[7.1] + def up + execute <<~SQL + DROP INDEX IF EXISTS ai_topic_embeddings_1_1_search; + DROP INDEX IF EXISTS ai_topic_embeddings_2_1_search; + DROP INDEX IF EXISTS ai_topic_embeddings_3_1_search; + DROP INDEX IF EXISTS ai_topic_embeddings_4_1_search; + DROP INDEX IF EXISTS ai_topic_embeddings_5_1_search; + DROP INDEX IF EXISTS ai_topic_embeddings_6_1_search; + DROP INDEX IF EXISTS ai_topic_embeddings_7_1_search; + DROP INDEX IF EXISTS ai_topic_embeddings_8_1_search; + + DROP INDEX IF EXISTS ai_post_embeddings_1_1_search; + DROP INDEX IF EXISTS ai_post_embeddings_2_1_search; + DROP INDEX IF EXISTS ai_post_embeddings_3_1_search; + DROP INDEX IF EXISTS ai_post_embeddings_4_1_search; + DROP INDEX IF EXISTS ai_post_embeddings_5_1_search; + DROP INDEX IF EXISTS ai_post_embeddings_6_1_search; + DROP INDEX IF EXISTS ai_post_embeddings_7_1_search; + DROP INDEX IF EXISTS ai_post_embeddings_8_1_search; + + DROP INDEX IF EXISTS ai_document_fragment_embeddings_1_1_search; + DROP INDEX IF EXISTS ai_document_fragment_embeddings_2_1_search; + DROP INDEX IF EXISTS ai_document_fragment_embeddings_3_1_search; + DROP INDEX IF EXISTS ai_document_fragment_embeddings_4_1_search; + DROP INDEX IF EXISTS ai_document_fragment_embeddings_5_1_search; + DROP INDEX IF EXISTS ai_document_fragment_embeddings_6_1_search; + DROP INDEX IF EXISTS ai_document_fragment_embeddings_7_1_search; + DROP INDEX IF EXISTS ai_document_fragment_embeddings_8_1_search; + SQL + end + + def down + raise ActiveRecord::IrreversibleMigration + end +end diff --git a/lib/embeddings/vector_representations/base.rb b/lib/embeddings/vector_representations/base.rb index db4d4a82..d8236181 100644 --- a/lib/embeddings/vector_representations/base.rb +++ b/lib/embeddings/vector_representations/base.rb @@ -46,113 +46,6 @@ module DiscourseAi @strategy = strategy end - def consider_indexing(memory: "100MB") - [topic_table_name, post_table_name].each do |table_name| - index_name = index_name(table_name) - # Using extension maintainer's recommendation for ivfflat indexes - # Results are not as good as without indexes, but it's much faster - # Disk usage is ~1x the size of the table, so this doubles table total size - count = - DB.query_single( - "SELECT count(*) FROM #{table_name} WHERE model_id = #{id} AND strategy_id = #{@strategy.id};", - ).first - lists = [count < 1_000_000 ? count / 1000 : Math.sqrt(count).to_i, 10].max - probes = [count < 1_000_000 ? lists / 10 : Math.sqrt(lists).to_i, 1].max - Discourse.cache.write("#{table_name}-#{id}-#{@strategy.id}-probes", probes) - - existing_index = DB.query_single(<<~SQL, index_name: index_name).first - SELECT - indexdef - FROM - pg_indexes - WHERE - indexname = :index_name - AND schemaname = 'public' - LIMIT 1 - SQL - - if !existing_index.present? - Rails.logger.info("Index #{index_name} does not exist, creating...") - return create_index!(table_name, memory, lists, probes) - end - - existing_index_age = - DB - .query_single( - "SELECT pg_catalog.obj_description((:index_name)::regclass, 'pg_class');", - index_name: index_name, - ) - .first - .to_i || 0 - new_rows = - DB.query_single( - "SELECT count(*) FROM #{table_name} WHERE model_id = #{id} AND strategy_id = #{@strategy.id} AND created_at > '#{Time.at(existing_index_age)}';", - ).first - existing_lists = existing_index.match(/lists='(\d+)'/)&.captures&.first&.to_i - - if existing_index_age > 0 && - existing_index_age < - ( - if SiteSetting.ai_embeddings_semantic_related_topics_enabled - 1.hour.ago.to_i - else - 1.day.ago.to_i - end - ) - if new_rows > 10_000 - Rails.logger.info( - "Index #{index_name} is #{existing_index_age} seconds old, and there are #{new_rows} new rows, updating...", - ) - return create_index!(table_name, memory, lists, probes) - elsif existing_lists != lists - Rails.logger.info( - "Index #{index_name} already exists, but lists is #{existing_lists} instead of #{lists}, updating...", - ) - return create_index!(table_name, memory, lists, probes) - end - end - - Rails.logger.info( - "Index #{index_name} kept. #{Time.now.to_i - existing_index_age} seconds old, #{new_rows} new rows, #{existing_lists} lists, #{probes} probes.", - ) - end - end - - def create_index!(table_name, memory, lists, probes) - tries = 0 - index_name = index_name(table_name) - DB.exec("SET work_mem TO '#{memory}';") - DB.exec("SET maintenance_work_mem TO '#{memory}';") - begin - DB.exec(<<~SQL) - DROP INDEX IF EXISTS #{index_name}; - CREATE INDEX IF NOT EXISTS - #{index_name} - ON - #{table_name} - USING - ivfflat ((embeddings::halfvec(#{dimensions})) #{pg_index_type}) - WITH - (lists = #{lists}) - WHERE - model_id = #{id} AND strategy_id = #{@strategy.id}; - SQL - rescue PG::ProgramLimitExceeded => e - parsed_error = e.message.match(/memory required is (\d+ [A-Z]{2}), ([a-z_]+)/) - if parsed_error[1].present? && parsed_error[2].present? - DB.exec("SET #{parsed_error[2]} TO '#{parsed_error[1].tr(" ", "")}';") - tries += 1 - retry if tries < 3 - else - raise e - end - end - - DB.exec("COMMENT ON INDEX #{index_name} IS '#{Time.now.to_i}';") - DB.exec("RESET work_mem;") - DB.exec("RESET maintenance_work_mem;") - end - def vector_from(text, asymetric: false) raise NotImplementedError end @@ -224,14 +117,23 @@ module DiscourseAi def asymmetric_topics_similarity_search(raw_vector, limit:, offset:, return_distance: false) results = DB.query(<<~SQL, query_embedding: raw_vector, limit: limit, offset: offset) - #{probes_sql(topic_table_name)} + WITH candidates AS ( + SELECT + topic_id, + embeddings::halfvec(#{dimensions}) AS embeddings + FROM + #{topic_table_name} + WHERE + model_id = #{id} AND strategy_id = #{@strategy.id} + ORDER BY + binary_quantize(embeddings)::bit(#{dimensions}) <~> binary_quantize('[:query_embedding]'::halfvec(#{dimensions})) + LIMIT :limit * 2 + ) SELECT topic_id, embeddings::halfvec(#{dimensions}) #{pg_function} '[:query_embedding]'::halfvec(#{dimensions}) AS distance FROM - #{topic_table_name} - WHERE - model_id = #{id} AND strategy_id = #{@strategy.id} + candidates ORDER BY embeddings::halfvec(#{dimensions}) #{pg_function} '[:query_embedding]'::halfvec(#{dimensions}) LIMIT :limit @@ -250,18 +152,23 @@ module DiscourseAi def asymmetric_posts_similarity_search(raw_vector, limit:, offset:, return_distance: false) results = DB.query(<<~SQL, query_embedding: raw_vector, limit: limit, offset: offset) - #{probes_sql(post_table_name)} + WITH candidates AS ( + SELECT + post_id, + embeddings::halfvec(#{dimensions}) AS embeddings + FROM + #{post_table_name} + WHERE + model_id = #{id} AND strategy_id = #{@strategy.id} + ORDER BY + binary_quantize(embeddings)::bit(#{dimensions}) <~> binary_quantize('[:query_embedding]'::halfvec(#{dimensions})) + LIMIT :limit * 2 + ) SELECT post_id, embeddings::halfvec(#{dimensions}) #{pg_function} '[:query_embedding]'::halfvec(#{dimensions}) AS distance FROM - #{post_table_name} - INNER JOIN - posts AS p ON p.id = post_id - INNER JOIN - topics AS t ON t.id = p.topic_id AND t.archetype = 'regular' - WHERE - model_id = #{id} AND strategy_id = #{@strategy.id} + candidates ORDER BY embeddings::halfvec(#{dimensions}) #{pg_function} '[:query_embedding]'::halfvec(#{dimensions}) LIMIT :limit @@ -286,32 +193,41 @@ module DiscourseAi offset:, return_distance: false ) + # A too low limit exacerbates the the recall loss of binary quantization + binary_search_limit = [limit * 2, 100].max results = DB.query( <<~SQL, - #{probes_sql(post_table_name)} - SELECT - rag_document_fragment_id, - embeddings::halfvec(#{dimensions}) #{pg_function} '[:query_embedding]'::halfvec(#{dimensions}) AS distance - FROM - #{rag_fragments_table_name} - INNER JOIN - rag_document_fragments AS rdf ON rdf.id = rag_document_fragment_id - WHERE - model_id = #{id} AND - strategy_id = #{@strategy.id} AND - rdf.target_id = :target_id AND - rdf.target_type = :target_type - ORDER BY - embeddings::halfvec(#{dimensions}) #{pg_function} '[:query_embedding]'::halfvec(#{dimensions}) - LIMIT :limit - OFFSET :offset - SQL + WITH candidates AS ( + SELECT + rag_document_fragment_id, + embeddings::halfvec(#{dimensions}) AS embeddings + FROM + #{rag_fragments_table_name} + INNER JOIN + rag_document_fragments ON rag_document_fragments.id = rag_document_fragment_id + WHERE + model_id = #{id} AND strategy_id = #{@strategy.id} + ORDER BY + binary_quantize(embeddings)::bit(#{dimensions}) <~> binary_quantize('[:query_embedding]'::halfvec(#{dimensions})) + LIMIT :binary_search_limit + ) + SELECT + rag_document_fragment_id, + embeddings::halfvec(#{dimensions}) #{pg_function} '[:query_embedding]'::halfvec(#{dimensions}) AS distance + FROM + candidates + ORDER BY + embeddings::halfvec(#{dimensions}) #{pg_function} '[:query_embedding]'::halfvec(#{dimensions}) + LIMIT :limit + OFFSET :offset + SQL query_embedding: raw_vector, target_id: target_id, target_type: target_type, limit: limit, offset: offset, + binary_search_limit: binary_search_limit, ) if return_distance @@ -326,17 +242,8 @@ module DiscourseAi def symmetric_topics_similarity_search(topic) DB.query(<<~SQL, topic_id: topic.id).map(&:topic_id) - #{probes_sql(topic_table_name)} - SELECT - topic_id - FROM - #{topic_table_name} - WHERE - model_id = #{id} AND - strategy_id = #{@strategy.id} - ORDER BY - embeddings::halfvec(#{dimensions}) #{pg_function} ( - SELECT + WITH le_target AS ( + SELECT embeddings FROM #{topic_table_name} @@ -345,8 +252,34 @@ module DiscourseAi strategy_id = #{@strategy.id} AND topic_id = :topic_id LIMIT 1 - )::halfvec(#{dimensions}) - LIMIT 100 + ) + SELECT topic_id FROM ( + SELECT + topic_id, embeddings + FROM + #{topic_table_name} + WHERE + model_id = #{id} AND + strategy_id = #{@strategy.id} + ORDER BY + binary_quantize(embeddings)::bit(#{dimensions}) <~> ( + SELECT + binary_quantize(embeddings)::bit(#{dimensions}) + FROM + le_target + LIMIT 1 + ) + LIMIT 200 + ) AS widenet + ORDER BY + embeddings::halfvec(#{dimensions}) #{pg_function} ( + SELECT + embeddings::halfvec(#{dimensions}) + FROM + le_target + LIMIT 1 + ) + LIMIT 100; SQL rescue PG::Error => e Rails.logger.error( @@ -384,11 +317,6 @@ module DiscourseAi "#{table_name}_#{id}_#{@strategy.id}_search" end - def probes_sql(table_name) - probes = Discourse.cache.read("#{table_name}-#{id}-#{@strategy.id}-probes") - probes.present? ? "SET LOCAL ivfflat.probes TO #{probes};" : "" - end - def name raise NotImplementedError end diff --git a/lib/tasks/modules/embeddings/database.rake b/lib/tasks/modules/embeddings/database.rake index 1e6a55dc..83f652a0 100644 --- a/lib/tasks/modules/embeddings/database.rake +++ b/lib/tasks/modules/embeddings/database.rake @@ -44,11 +44,3 @@ task "ai:embeddings:backfill", %i[model concurrency] => [:environment] do |_, ar end end end - -desc "Creates indexes for embeddings" -task "ai:embeddings:index", [:work_mem] => [:environment] do |_, args| - strategy = DiscourseAi::Embeddings::Strategies::Truncation.new - vector_rep = DiscourseAi::Embeddings::VectorRepresentations::Base.current_representation(strategy) - - vector_rep.consider_indexing(memory: args[:work_mem] || "100MB") -end