feat: Introduce enqueue_all support as part of the Rails 7.1 perform_all_later method.#103
feat: Introduce enqueue_all support as part of the Rails 7.1 perform_all_later method.#103CelticMajora wants to merge 7 commits intomainfrom
Conversation
Implements ActiveJob 7.1's enqueue_all adapter contract so that ActiveJob.perform_all_later issues a single bulk INSERT rather than looping through per-job enqueues. Per the Rails bulk-enqueue contract, this skips ActiveJob's before/around/after_enqueue callbacks, sets provider_job_id and successfully_enqueued on each job, and returns the count of successfully enqueued jobs. Since insert_all bypasses ActiveRecord callbacks, Delayed::Job's before_save hooks (set_default_run_at, set_name) are replicated inline while building each row. Jobs configured to run inline (Delayed::Worker.delay_jobs != true) fall back to the per-job path so that invoke_job semantics are preserved. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Wraps per-job attribute-building inside Delayed.lifecycle run_callbacks(:enqueue, dj) so Delayed plugins (monitoring, instrumentation, etc.) continue to observe each job as it is bulk-enqueued via perform_all_later. The callback block invokes dj.hook(:enqueue) for parity with the single-enqueue path; JobWrapper explicitly no-ops that hook, as before. ActiveJob's own before/around/after_enqueue callbacks remain skipped, per the Rails 7.1 bulk-enqueue contract. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Replaces two inline ActiveSupport::Notifications subscriptions with the shared `emit_notification` matcher from spec/helper.rb, matching the precedent in monitor_spec.rb. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Replaces the adapter's inline replication of Delayed::Job's before_save callbacks with a single `before_save_hooks` method on the model, and passes `record_timestamps: true` to insert_all so ActiveRecord stamps created_at/updated_at during the bulk insert rather than the adapter hoisting db_time_now and assigning them manually. Keeps the bulk path automatically in sync with any future before_save additions to Delayed::Job. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
insert_all with `returning: %w(id)` raises ArgumentError on MySQL (supports_insert_returning? is false unless MariaDB 10.5+). Dropping `returning:` alone would leave `result.rows` empty on those adapters, breaking the `provider_job_id` contract. Introduces a `bulk_enqueue_supported?` predicate guarding the bulk path on both `Delayed::Worker.delay_jobs == true` and `connection.supports_insert_returning?`. MySQL callers still get correct semantics via the per-job fallback; PostgreSQL and SQLite keep the single-INSERT fast path. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…lity The bulk path relies on three ActiveJob/Rails 7.1+ APIs that don't exist in Rails 6.0/6.1/7.0: - insert_all(record_timestamps: true) (Rails 7.0+) - ActiveJob::Base#set instance method (AJ 7.1+) - successfully_enqueued= setter (AJ 7.1+) Since perform_all_later itself is AJ 7.1+, there's no caller for enqueue_all on earlier versions. Define the public method only when ActiveJob >= 7.1 and guard the spec block the same way. Private helpers stay defined unconditionally for simpler class layout; they are unreachable on <7.1 since only enqueue_all calls them. Verified against rails-6-0, rails-6-1, rails-7-0 appraisals. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
| # Wrap in a transaction so a mid-loop failure (e.g. StaleEnqueueError on | ||
| # job N) rolls back jobs 0..N-1, matching the bulk path's all-or-nothing | ||
| # semantics. | ||
| Delayed::Job.transaction do | ||
| jobs.count do |job| | ||
| opts = job.scheduled_at ? { run_at: coerce_scheduled_at(job.scheduled_at) } : {} | ||
| _enqueue(job, opts) | ||
| job.successfully_enqueued = true | ||
| true | ||
| end | ||
| end |
There was a problem hiding this comment.
We may be better off raiseing in this case for now, as holding a long-running transaction around an N+1 insert will perform poorly under realistic conditions.
There was a problem hiding this comment.
Would it be preferred to have this delegate as if enqueue_all weren't implemented?
There was a problem hiding this comment.
Not sure I follow. I think we should allow enqueue_all to proceed if it is supported, or raise an exception if the caller attempts to call it and it is not supported.
| end | ||
|
|
||
| def coerce_scheduled_at(value) | ||
| value.is_a?(Numeric) ? Time.at(value) : value # rubocop:disable Rails/TimeZone |
There was a problem hiding this comment.
Does job.scheduled_at really need to be coerced? Seems reasonable to expect the implementation to construct a valid Time object.
There was a problem hiding this comment.
I wasn't sure if we needed to support old versions of ActiveJob where it was a numeric: https://apidock.com/rails/ActiveJob/Core/scheduled_at%3D
There was a problem hiding this comment.
Ah, looks like that landed in 7.1, so we'd need to restrict our activejob dependency in turn. The activejob dependency is technically optional, but based on our appraisals we still support >= 6.0... 🤔
I think we can consider changing support in a separate PR and keep the coercion in place here, if that makes sense to you.
| job.provider_job_id = id | ||
| job.successfully_enqueued = true |
There was a problem hiding this comment.
Setting successfully_enqueued actually diverges from the singleton _enqueue method, and makes me wonder if we should be setting successfully_enqueued down there as well.
Additionally, the way that _enqueue handles the provider ID assignment has always felt a little vestigial, as the current implementation cannot know the ID before it has persisted the job, at which point it would be pointlessly costly to issue an UPDATE to persist the ID into the ActiveJob attributes. As a result, the ActiveJob attributes always have a null provider_job_id upon read, so we make a point not to rely on it.
My thinking is that we could simplify this method significantly (and support MySQL) if we skip handling provider_job_id after persistence.
(IMO, if we actually wanted to set it in the ActiveJob payload, we should generate a uuidv7 up front and persist that both to the jobs table and the ActiveJob payload. But proper support starts to tug at other behaviors like retry_on, which can pass the same ActiveJob payload through multiple provider job rows, each with their own execution IDs.)
| # on <7.1. | ||
| if ActiveJob.gem_version >= Gem::Version.new('7.1') | ||
| def enqueue_all(jobs) | ||
| return 0 if jobs.empty? |
There was a problem hiding this comment.
It's not clear to me if enqueue_all is required to return anything: https://github.com/rails/rails/blob/eb126bb140127d3589ad9be093a845e24fc4f475/activejob/lib/active_job/enqueuing.rb#L19
There was a problem hiding this comment.
Other adapters I was looking at(ie: Solid Queue and Sidekiq) all returned the count, but it was unclear whether that was part of the contract.
| def before_save_hooks | ||
| set_default_run_at | ||
| set_name | ||
| end |
There was a problem hiding this comment.
It would be nice to avoid expanding the public API of the Job model -- I understand wanting to avoid drift, but it also doesn't stop someone from adding a before_save and skipping callbacks again.
My thinking is that these should not be before_save calls at all and instead should be handled by the enqueue and enqueue_all methods. We could retain validations on this class to ensure that they are always set by the caller, but remove some of the magic so that the caller is always forced to call these two methods (or supply their own values) before saving.
There was a problem hiding this comment.
Actually instead of validations, for our purposes, ensuring that we have appropriate DB constraints would be enough. (Otherwise we'd need to call validate! on each job while constructing the insert all payload, and that might not be worth the minor performance hit if we can get the guarantee from the DB itself.)
smudge
left a comment
There was a problem hiding this comment.
TAFN -- Really excited for this! I left a few comments, but overall I'm looking to unify the implementation of enqueue and enqueue_all as much as possible. (Like, in theory, enqueue(job) could defined as enqueue_all([job]), and I'd like to see how close to that we can get!)
As it stands, my read is that the enqueue_all path makes an effort to replicate the lifecycle hooks of Delayed::Backend::Base#enqueue (e.g. using JobPreparer, calling the before_save methods, and running the :enqueue callback for each one), but the codepaths are not shared and would be subject to drift.
Additionally, pushing ourselves closer to a shared enqueue_all(one_or_more_jobs) implementation could also force some interesting questions about the necessity of some of these extra enqueue-time operations like the :enqueue lifecycle hook. I think there are some compelling arguments for dropping support for per-job operations/configurations that could instead be managed in bulk. (Like, if :enqueue as a lifecycle hook were to receive a list of jobs rather than a single job -- obv. a breaking change, but gets us the observability hook without the N+1 calls against each individual job object.)
I'm mostly spitballing in that last paragraph, but wanted to start a conversation, because we're in a territory where I think it would already be pretty reasonable to cut a new major version and break some older behaviors/assumptions.
Implements
enqueue_allin order to support Rails 7.1 perform_all_laterKey decisions:
enqueue_after_transaction_commit:enqueuelifecycle hooksCurious on the preferred approach on navigating Rails versions and differing SQL implementations(both in code and specs)
/domain @smudge @effron
/platform @smudge @effron