diff --git a/lib/sentiment/post_classification.rb b/lib/sentiment/post_classification.rb index ce7302a4..47da865f 100644 --- a/lib/sentiment/post_classification.rb +++ b/lib/sentiment/post_classification.rb @@ -44,57 +44,69 @@ module DiscourseAi Post.from(Arel.sql("(#{unioned_queries}) as posts")) end + CONCURRENT_CLASSFICATIONS = 40 + def bulk_classify!(relation) - http_pool_size = 100 pool = - Concurrent::CachedThreadPool.new( + Scheduler::ThreadPool.new( min_threads: 0, - max_threads: http_pool_size, - idletime: 30, + max_threads: CONCURRENT_CLASSFICATIONS, + idle_time: 30, ) available_classifiers = classifiers return if available_classifiers.blank? - promised_classifications = - relation - .map do |record| - text = prepare_text(record) - next if text.blank? + results = Queue.new + queued = 0 - Concurrent::Promises - .fulfilled_future({ target: record, text: text }, pool) - .then_on(pool) do |w_text| - results = Concurrent::Hash.new - already_classified = w_text[:target].sentiment_classifications.map(&:model_used) + relation.each do |record| + text = prepare_text(record) + next if text.blank? - classifiers_for_target = - available_classifiers.reject do |ac| - already_classified.include?(ac[:model_name]) - end + already_classified = record.sentiment_classifications.pluck(&:model_used) + missing_classifiers = + available_classifiers.reject { |ac| already_classified.include?(ac[:model_name]) } - promised_target_results = - classifiers_for_target.map do |cft| - Concurrent::Promises.future_on(pool) do - results[cft[:model_name]] = request_with(cft[:client], w_text[:text]) - end - end - - Concurrent::Promises - .zip(*promised_target_results) - .then_on(pool) { |_| w_text.merge(classification: results) } - end - .flat(1) + missing_classifiers.each do |classifier| + pool.post do + result = { target: record, classifier: classifier, text: text } + begin + result[:classification] = request_with(classifier[:client], text) + rescue StandardError => e + result[:error] = e + end + results << result end - .compact + queued += 1 + end + end - Concurrent::Promises - .zip(*promised_classifications) - .value! - .each { |r| store_classification(r[:target], r[:classification]) } + errors = [] + + while queued > 0 + result = results.pop + if result[:error] + errors << result + else + store_classification( + result[:target], + [[result[:classifier][:model_name], result[:classification]]], + ) + end + queued -= 1 + end + + if errors.any? + example_posts = errors.map { |e| e[:target].id }.take(5).join(", ") + Discourse.warn_exception( + errors[0][:error], + "Discourse AI: Errors during bulk classification: Failed to classify #{errors.count} posts (example ids: #{example_posts})", + ) + end ensure pool.shutdown - pool.wait_for_termination + pool.wait_for_termination(timeout: 30) end def classify!(target)