Skip to content

DM-53494: Add ability to run submit processes as batch jobs.#232

Draft
MichelleGower wants to merge 2 commits intomainfrom
tickets/DM-53494
Draft

DM-53494: Add ability to run submit processes as batch jobs.#232
MichelleGower wants to merge 2 commits intomainfrom
tickets/DM-53494

Conversation

@MichelleGower
Copy link
Copy Markdown
Collaborator

Note: Limited to same cluster with shared filesystems.

Checklist

  • ran Jenkins
  • added a release note for user-visible changes to doc/changes

@codecov
Copy link
Copy Markdown

codecov bot commented Feb 20, 2026

❌ 7 Tests Failed:

Tests completed Failed Passed Skipped
326 7 319 0
View the top 3 failed test(s) by shortest run time
tests/test_drivers.py::TestBatchSubmitDriver::testDryRun
Stack Traces | 0.004s run time
.../hostedtoolcache/Python/3.12.13...................../x64/lib/python3.12/unittest/mock.py:1393: in patched
    with self.decoration_helper(patched,
.../hostedtoolcache/Python/3.12.13...................../x64/lib/python3.12/contextlib.py:137: in __enter__
    return next(self.gen)
           ^^^^^^^^^^^^^^
.../hostedtoolcache/Python/3.12.13...................../x64/lib/python3.12/unittest/mock.py:1375: in decoration_helper
    arg = exit_stack.enter_context(patching)
          ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
.../hostedtoolcache/Python/3.12.13...................../x64/lib/python3.12/contextlib.py:526: in enter_context
    result = _enter(cm)
             ^^^^^^^^^^
.../hostedtoolcache/Python/3.12.13...................../x64/lib/python3.12/unittest/mock.py:1467: in __enter__
    original, local = self.get_original()
                      ^^^^^^^^^^^^^^^^^^^
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <unittest.mock._patch object at 0x7f476b5e5af0>

    def get_original(self):
        target = self.getter()
        name = self.attribute
    
        original = DEFAULT
        local = False
    
        try:
            original = target.__dict__[name]
        except (AttributeError, KeyError):
            original = getattr(target, name, DEFAULT)
        else:
            local = True
    
        if name in _builtins and isinstance(target, ModuleType):
            self.create = True
    
        if not self.create and original is DEFAULT:
>           raise AttributeError(
                "%s does not have the attribute %r" % (target, name)
            )
E           AttributeError: <module 'lsst.ctrl.bps.drivers' from '.../hostedtoolcache/Python/3.12.13...................../x64/lib/python3.12.../ctrl/bps/drivers.py'> does not have the attribute 'create_batch_stages'

.../hostedtoolcache/Python/3.12.13...................../x64/lib/python3.12/unittest/mock.py:1437: AttributeError
tests/test_drivers.py::TestBatchSubmitDriver::testSubmit
Stack Traces | 0.004s run time
.../hostedtoolcache/Python/3.12.13...................../x64/lib/python3.12/unittest/mock.py:1393: in patched
    with self.decoration_helper(patched,
.../hostedtoolcache/Python/3.12.13...................../x64/lib/python3.12/contextlib.py:137: in __enter__
    return next(self.gen)
           ^^^^^^^^^^^^^^
.../hostedtoolcache/Python/3.12.13...................../x64/lib/python3.12/unittest/mock.py:1375: in decoration_helper
    arg = exit_stack.enter_context(patching)
          ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
.../hostedtoolcache/Python/3.12.13...................../x64/lib/python3.12/contextlib.py:526: in enter_context
    result = _enter(cm)
             ^^^^^^^^^^
.../hostedtoolcache/Python/3.12.13...................../x64/lib/python3.12/unittest/mock.py:1467: in __enter__
    original, local = self.get_original()
                      ^^^^^^^^^^^^^^^^^^^
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <unittest.mock._patch object at 0x7f476b5e5cd0>

    def get_original(self):
        target = self.getter()
        name = self.attribute
    
        original = DEFAULT
        local = False
    
        try:
            original = target.__dict__[name]
        except (AttributeError, KeyError):
            original = getattr(target, name, DEFAULT)
        else:
            local = True
    
        if name in _builtins and isinstance(target, ModuleType):
            self.create = True
    
        if not self.create and original is DEFAULT:
>           raise AttributeError(
                "%s does not have the attribute %r" % (target, name)
            )
E           AttributeError: <module 'lsst.ctrl.bps.drivers' from '.../hostedtoolcache/Python/3.12.13...................../x64/lib/python3.12.../ctrl/bps/drivers.py'> does not have the attribute 'create_batch_stages'

.../hostedtoolcache/Python/3.12.13...................../x64/lib/python3.12/unittest/mock.py:1437: AttributeError
tests/test_drivers.py::TestBatchAcquireDriver::testSuccess
Stack Traces | 0.005s run time
self = <test_drivers.TestBatchAcquireDriver testMethod=testSuccess>
mock_acquire = <MagicMock name='acquire_quantum_graph' id='139944718985280'>

    @unittest.mock.patch("lsst.ctrl.bps.drivers.acquire_quantum_graph")
    def testSuccess(self, mock_acquire):
>       drivers.batch_acquire_driver(self.config_file)

tests/test_drivers.py:372: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

config_file = '.../tmp/tmpv5orxmfn/config.yaml', kwargs = {}
config = BpsConfig({'bps_cmdline': {}, 'bps_defined': {'submitPath': '/tmp/tmpv5orxmfn'}, 'cloud': {}, 'cluster': {}, 'payload': {}, 'pipetask': {}, 'site': {}})

    def batch_acquire_driver(config_file: str, **kwargs: Any) -> None:
        """Create a quantum graph from pipeline definition in a batch job.
    
        Parameters
        ----------
        config_file : `str`
            Name of the configuration file.
        **kwargs : `~typing.Any`
            Additional modifiers to the configuration.
        """
        config = BpsConfig(config_file)
>       translate_command_line_values(config, **kwargs)
E       TypeError: translate_command_line_values() missing 1 required positional argument: 'kwargs'

.../hostedtoolcache/Python/3.12.13.../x64/lib/python3.12.../ctrl/bps/drivers.py:719: TypeError
tests/test_drivers.py::TestBatchPrepareDriver::testSuccess
Stack Traces | 0.005s run time
self = <test_drivers.TestBatchPrepareDriver testMethod=testSuccess>
mock_prepare = <MagicMock name='batch_payload_prepare' id='139944716744752'>

    @unittest.mock.patch("lsst.ctrl.bps.drivers.batch_payload_prepare")
    def testSuccess(self, mock_prepare):
>       drivers.batch_prepare_driver(self.config_file)

tests/test_drivers.py:392: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

config_file = '.../tmp/tmp0b_pk5dj/config.yaml', kwargs = {}
config = BpsConfig({'bps_cmdline': {}, 'bps_defined': {'submitPath': '/tmp/tmp0b_pk5dj'}, 'cloud': {}, 'cluster': {}, 'payload': {}, 'pipetask': {}, 'site': {}})

    def batch_prepare_driver(config_file: str, **kwargs: Any) -> None:
        """Run workflow preparation in a batch job for an existing QuantumGraph.
    
        Parameters
        ----------
        config_file : `str`
            Name of the configuration file.
        **kwargs : `~typing.Any`
            Additional modifiers to the configuration.
        """
        config = BpsConfig(config_file)
>       translate_command_line_values(config, **kwargs)
E       TypeError: translate_command_line_values() missing 1 required positional argument: 'kwargs'

.../hostedtoolcache/Python/3.12.13.../x64/lib/python3.12.../ctrl/bps/drivers.py:751: TypeError
tests/test_batch_submit.py::TestCreateBatchStages::testSaving
Stack Traces | 0.014s run time
self = <test_batch_submit.TestCreateBatchStages testMethod=testSaving>

    def testSaving(self):
        config = BpsConfig(
            {
                "configFile": "not_used_configFile",
                "uniqProcName": "uniq_proc_name",
                "operator": "testuser",
                "payload": {"payloadName": "testPayload"},
                "bpsPreCommandOpts": "--long-log --log-level=VERBOSE",
                "buildQuantumGraph": {
                    "jobCommand": "${CTRL_BPS_DIR}/bin/bps batch-acquire {configFile}",
                    "requestMemory": 16384,
                },
                "preparePayloadWorkflow": {
                    "jobCommand": "${CTRL_BPS_DIR}/bin/bps batch-prepare {configFile}",
                    "requestMemory": 24576,
                },
                "saveGenericWorkflow": True,
            }
        )
        with tempfile.TemporaryDirectory() as tmpdir:
>           gw, config = batch_submit.create_batch_stages(config, tmpdir)
                         ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

tests/test_batch_submit.py:119: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
.../hostedtoolcache/Python/3.12.13............/x64/lib/python3.12.../lsst/utils/timer.py:324: in timeMethod_wrapper
    res = func(self, *args, **keyArgs)
          ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
.../hostedtoolcache/Python/3.12.13............/x64/lib/python3.12.../ctrl/bps/batch_submit.py:148: in create_batch_stages
    _enhance_command(config, generic_workflow, build_job, job_values)
.../hostedtoolcache/Python/3.12.13............/x64/lib/python3.12.../ctrl/bps/transform.py:241: in _enhance_command
    cached_job_values[gwjob.label][key] = WhenToSaveQuantumGraphs[when_save.upper()]
                                          ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

cls = <enum 'WhenToSaveQuantumGraphs'>, name = ''

    def __getitem__(cls, name):
        """
        Return the member matching `name`.
        """
>       return cls._member_map_[name]
               ^^^^^^^^^^^^^^^^^^^^^^
E       KeyError: ''

.../hostedtoolcache/Python/3.12.13............/x64/lib/python3.12/enum.py:813: KeyError
tests/test_batch_submit.py::TestCreateBatchStages::testSuccess
Stack Traces | 0.014s run time
self = <test_batch_submit.TestCreateBatchStages testMethod=testSuccess>

    def testSuccess(self):
        # No saving of files
        config = BpsConfig(
            {
                "configFile": "not_used_configFile",
                "uniqProcName": "uniq_proc_name",
                "operator": "testuser",
                "payload": {"payloadName": "testPayload"},
                "bpsPreCommandOpts": "--long-log --log-level=VERBOSE",
                "buildQuantumGraph": {
                    "jobCommand": "${CTRL_BPS_DIR}/bin/bps batch-acquire {configFile}",
                    "requestMemory": 16384,
                },
                "preparePayloadWorkflow": {
                    "jobCommand": "${CTRL_BPS_DIR}/bin/bps batch-prepare {configFile}",
                    "requestMemory": 24576,
                },
            }
        )
    
        with tempfile.TemporaryDirectory() as tmpdir:
>           gw, config = batch_submit.create_batch_stages(config, tmpdir)
                         ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

tests/test_batch_submit.py:86: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
.../hostedtoolcache/Python/3.12.13............/x64/lib/python3.12.../lsst/utils/timer.py:324: in timeMethod_wrapper
    res = func(self, *args, **keyArgs)
          ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
.../hostedtoolcache/Python/3.12.13............/x64/lib/python3.12.../ctrl/bps/batch_submit.py:148: in create_batch_stages
    _enhance_command(config, generic_workflow, build_job, job_values)
.../hostedtoolcache/Python/3.12.13............/x64/lib/python3.12.../ctrl/bps/transform.py:241: in _enhance_command
    cached_job_values[gwjob.label][key] = WhenToSaveQuantumGraphs[when_save.upper()]
                                          ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

cls = <enum 'WhenToSaveQuantumGraphs'>, name = ''

    def __getitem__(cls, name):
        """
        Return the member matching `name`.
        """
>       return cls._member_map_[name]
               ^^^^^^^^^^^^^^^^^^^^^^
E       KeyError: ''

.../hostedtoolcache/Python/3.12.13............/x64/lib/python3.12/enum.py:813: KeyError
tests/test_batch_submit.py::TestCreateBatchStages::testMissingPrepareCmd
Stack Traces | 0.017s run time
self = <test_batch_submit.TestCreateBatchStages testMethod=testMissingPrepareCmd>

    def testMissingPrepareCmd(self):
        """Missing preparePayloadWorkflow jobCommand"""
        config = BpsConfig(
            {
                "configFile": "not_used_configFile",
                "uniqProcName": "uniq_proc_name",
                "operator": "testuser",
                "payload": {"payloadName": "testPayload"},
                "bpsPreCommandOpts": "--long-log --log-level=VERBOSE",
                "buildQuantumGraph": {"jobCommand": "${CTRL_BPS_DIR}/bin/bps batch-acquire {configFile}"},
            }
        )
        with self.assertRaisesRegex(
            RuntimeError,
            "Missing executable for preparePayloadWorkflow.  Double check submit yaml for jobCommand",
        ):
>           _ = batch_submit.create_batch_stages(config, "not_used_prefix")
                ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

tests/test_batch_submit.py:63: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
.../hostedtoolcache/Python/3.12.13............/x64/lib/python3.12.../lsst/utils/timer.py:324: in timeMethod_wrapper
    res = func(self, *args, **keyArgs)
          ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
.../hostedtoolcache/Python/3.12.13............/x64/lib/python3.12.../ctrl/bps/batch_submit.py:148: in create_batch_stages
    _enhance_command(config, generic_workflow, build_job, job_values)
.../hostedtoolcache/Python/3.12.13............/x64/lib/python3.12.../ctrl/bps/transform.py:241: in _enhance_command
    cached_job_values[gwjob.label][key] = WhenToSaveQuantumGraphs[when_save.upper()]
                                          ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

    def __getitem__(cls, name):
        """
        Return the member matching `name`.
        """
>       return cls._member_map_[name]
               ^^^^^^^^^^^^^^^^^^^^^^
E       KeyError: ''

.../hostedtoolcache/Python/3.12.13............/x64/lib/python3.12/enum.py:813: KeyError

To view more test analytics, go to the Test Analytics Dashboard
📋 Got 3 mins? Take this short survey to help us improve Test Analytics.

Note:  Limited to same cluster with shared filesystems.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant