-
-
Notifications
You must be signed in to change notification settings - Fork 3.2k
Parse files in parallel when possible #21175
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from 8 commits
8449bb5
0f160f8
24aad4f
93ad7ce
cd85d5d
1879228
e8fbe26
20fcce2
5d2a216
533b0c7
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -26,7 +26,8 @@ | |
| import sys | ||
| import time | ||
| import types | ||
| from collections.abc import Callable, Iterator, Mapping, Sequence, Set as AbstractSet | ||
| from collections.abc import Callable, Iterable, Iterator, Mapping, Sequence, Set as AbstractSet | ||
| from concurrent.futures import FIRST_EXCEPTION, ThreadPoolExecutor, wait | ||
| from heapq import heappop, heappush | ||
| from textwrap import dedent | ||
| from typing import ( | ||
|
|
@@ -125,6 +126,7 @@ | |
| from mypy.util import ( | ||
| DecodeError, | ||
| decode_python_encoding, | ||
| get_available_threads, | ||
| get_mypy_comments, | ||
| hash_digest, | ||
| hash_digest_bytes, | ||
|
|
@@ -947,6 +949,60 @@ def dump_stats(self) -> None: | |
| for key, value in sorted(self.stats_summary().items()): | ||
| print(f"{key + ':':24}{value}") | ||
|
|
||
| def parse_all(self, states: Iterable[State]) -> None: | ||
| """Parse multiple files in parallel (if possible) and compute dependencies. | ||
|
|
||
| Note: this duplicates a bit of logic from State.parse_file(). This is done | ||
| as a micro-optimization to parallelize only those parts of the code that | ||
| can be parallelized efficiently. | ||
| """ | ||
| if self.options.native_parser: | ||
| futures = [] | ||
| parsed_states = set() | ||
| available_threads = get_available_threads() | ||
| # Overhead from trying to parallelize (small) blocking portion of | ||
| # parse_file_inner() results in no visible improvement with more than 8 threads. | ||
| with ThreadPoolExecutor(max_workers=min(available_threads, 8)) as executor: | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Reusing the ThreadPoolExecutor could also help scaling a bit, based on some quick microbenchmarks (this is a potential future improvement). |
||
| for state in states: | ||
| state.needs_parse = False | ||
| if state.tree is not None: | ||
| # The file was already parsed. | ||
| continue | ||
| # New parser reads source from file directly, we do this only for | ||
| # the side effect of parsing inline mypy configurations. | ||
| state.get_source() | ||
| if state.id not in self.ast_cache: | ||
| futures.append(executor.submit(state.parse_file_inner, state.source or "")) | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Another potential future improvement/experiment: sending multiple small files in a single |
||
| parsed_states.add(state) | ||
| else: | ||
| self.log(f"Using cached AST for {state.xpath} ({state.id})") | ||
| state.tree, state.early_errors = self.ast_cache[state.id] | ||
| for fut in wait(futures, return_when=FIRST_EXCEPTION).done: | ||
| # This will raise exceptions, if any. | ||
| fut.result() | ||
|
|
||
| for state in states: | ||
| assert state.tree is not None | ||
| if state in parsed_states: | ||
| state.early_errors = list(self.errors.error_info_map.get(state.xpath, [])) | ||
| state.semantic_analysis_pass1() | ||
| self.ast_cache[state.id] = (state.tree, state.early_errors) | ||
| self.modules[state.id] = state.tree | ||
| state.check_blockers() | ||
| state.setup_errors() | ||
| else: | ||
| # Old parser cannot be parallelized. | ||
| for state in states: | ||
| state.parse_file() | ||
|
|
||
| for state in states: | ||
| state.compute_dependencies() | ||
| if self.workers and state.tree: | ||
| # We don't need imports in coordinator process anymore, we parse only to | ||
| # compute dependencies. | ||
| state.tree.imports = [] | ||
| del self.ast_cache[state.id] | ||
|
|
||
| def use_fine_grained_cache(self) -> bool: | ||
| return self.cache_enabled and self.options.use_fine_grained_cache | ||
|
|
||
|
|
@@ -2502,8 +2558,7 @@ def new_state( | |
| # we need to re-calculate dependencies. | ||
| # NOTE: see comment below for why we skip this in fine-grained mode. | ||
| if exist_added_packages(suppressed, manager): | ||
| state.parse_file() # This is safe because the cache is anyway stale. | ||
| state.compute_dependencies() | ||
| state.needs_parse = True # This is safe because the cache is anyway stale. | ||
| # This is an inverse to the situation above. If we had an import like this: | ||
| # from pkg import mod | ||
| # and then mod was deleted, we need to force recompute dependencies, to | ||
|
|
@@ -2512,8 +2567,7 @@ def new_state( | |
| # import pkg | ||
| # import pkg.mod | ||
| if exist_removed_submodules(dependencies, manager): | ||
| state.parse_file() # Same as above, the current state is stale anyway. | ||
| state.compute_dependencies() | ||
| state.needs_parse = True # Same as above, the current state is stale anyway. | ||
| state.size_hint = meta.size | ||
| else: | ||
| # When doing a fine-grained cache load, pretend we only | ||
|
|
@@ -2523,14 +2577,17 @@ def new_state( | |
| manager.log(f"Deferring module to fine-grained update {path} ({id})") | ||
| raise ModuleNotFound | ||
|
|
||
| # Parse the file (and then some) to get the dependencies. | ||
| state.parse_file(temporary=temporary) | ||
| state.compute_dependencies() | ||
| if manager.workers and state.tree: | ||
| # We don't need imports in coordinator process anymore, we parse only to | ||
| # compute dependencies. | ||
| state.tree.imports = [] | ||
| del manager.ast_cache[id] | ||
| if temporary: | ||
| # Eagerly parse temporary states, they are needed rarely. | ||
| state.parse_file(temporary=True) | ||
| state.compute_dependencies() | ||
| if state.manager.workers and state.tree: | ||
| # We don't need imports in coordinator process anymore, we parse only to | ||
| # compute dependencies. | ||
| state.tree.imports = [] | ||
| del state.manager.ast_cache[state.id] | ||
| else: | ||
| state.needs_parse = True | ||
|
|
||
| return state | ||
|
|
||
|
|
@@ -2593,6 +2650,8 @@ def __init__( | |
| # Pre-computed opaque value of suppressed_deps_opts() used | ||
| # to minimize amount of data sent to parallel workers. | ||
| self.known_suppressed_deps_opts: bytes | None = None | ||
| # An internal flag used by build manager to schedule states for parsing. | ||
| self.needs_parse = False | ||
|
|
||
| def write(self, buf: WriteBuffer) -> None: | ||
| """Serialize State for sending to build worker. | ||
|
|
@@ -2816,26 +2875,9 @@ def fix_cross_refs(self) -> None: | |
|
|
||
| # Methods for processing modules from source code. | ||
|
|
||
| def parse_file(self, *, temporary: bool = False, raw_data: FileRawData | None = None) -> None: | ||
| """Parse file and run first pass of semantic analysis. | ||
|
|
||
| Everything done here is local to the file. Don't depend on imported | ||
| modules in any way. Also record module dependencies based on imports. | ||
| """ | ||
| if self.tree is not None: | ||
| # The file was already parsed (in __init__()). | ||
| return | ||
|
|
||
| def get_source(self) -> str: | ||
| """Get module source and parse inline mypy configurations.""" | ||
| manager = self.manager | ||
|
|
||
| # Can we reuse a previously parsed AST? This avoids redundant work in daemon. | ||
| cached = self.id in manager.ast_cache | ||
| modules = manager.modules | ||
| if not cached: | ||
| manager.log(f"Parsing {self.xpath} ({self.id})") | ||
| else: | ||
| manager.log(f"Using cached AST for {self.xpath} ({self.id})") | ||
|
|
||
| t0 = time_ref() | ||
|
|
||
| with self.wrap_context(): | ||
|
|
@@ -2877,33 +2919,53 @@ def parse_file(self, *, temporary: bool = False, raw_data: FileRawData | None = | |
| self.check_for_invalid_options() | ||
|
|
||
| self.size_hint = len(source) | ||
| if not cached: | ||
| ignore_errors = self.ignore_all or self.options.ignore_errors | ||
| self.tree = manager.parse_file( | ||
| self.id, | ||
| self.xpath, | ||
| source, | ||
| ignore_errors=ignore_errors, | ||
| options=self.options, | ||
| raw_data=raw_data, | ||
| ) | ||
| else: | ||
| # Reuse a cached AST | ||
| self.tree = manager.ast_cache[self.id][0] | ||
| self.time_spent_us += time_spent_us(t0) | ||
| return source | ||
|
|
||
| def parse_file_inner(self, source: str, raw_data: FileRawData | None = None) -> None: | ||
| t0 = time_ref() | ||
| self.manager.log(f"Parsing {self.xpath} ({self.id})") | ||
| with self.wrap_context(): | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Error context and the A more efficient approach would be to have It's okay to do something simple at first. We can always improve it later, but it would be good to fix race conditions before merging. Also in
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, indeed there are race conditions. But IIUC the extra |
||
| ignore_errors = self.ignore_all or self.options.ignore_errors | ||
| self.tree = self.manager.parse_file( | ||
| self.id, | ||
| self.xpath, | ||
| source, | ||
| ignore_errors=ignore_errors, | ||
| options=self.options, | ||
| raw_data=raw_data, | ||
| ) | ||
| self.time_spent_us += time_spent_us(t0) | ||
|
|
||
| if not cached: | ||
| def parse_file(self, *, temporary: bool = False, raw_data: FileRawData | None = None) -> None: | ||
| """Parse file and run first pass of semantic analysis. | ||
|
|
||
| Everything done here is local to the file. Don't depend on imported | ||
| modules in any way. Logic here should be kept in sync with BuildManager.parse_all(). | ||
| """ | ||
| self.needs_parse = False | ||
| if self.tree is not None: | ||
| # The file was already parsed. | ||
| return | ||
|
|
||
| source = self.get_source() | ||
| manager = self.manager | ||
| # Can we reuse a previously parsed AST? This avoids redundant work in daemon. | ||
| if self.id not in manager.ast_cache: | ||
| self.parse_file_inner(source, raw_data) | ||
| # Make a copy of any errors produced during parse time so that | ||
| # fine-grained mode can repeat them when the module is | ||
| # reprocessed. | ||
| self.early_errors = list(manager.errors.error_info_map.get(self.xpath, [])) | ||
| self.semantic_analysis_pass1() | ||
| else: | ||
| self.early_errors = manager.ast_cache[self.id][1] | ||
| # Reuse a cached AST | ||
| manager.log(f"Using cached AST for {self.xpath} ({self.id})") | ||
| self.tree, self.early_errors = manager.ast_cache[self.id] | ||
|
|
||
| assert self.tree is not None | ||
| if not temporary: | ||
| modules[self.id] = self.tree | ||
| manager.modules[self.id] = self.tree | ||
| self.check_blockers() | ||
|
|
||
| manager.ast_cache[self.id] = (self.tree, self.early_errors) | ||
|
|
@@ -3075,14 +3137,15 @@ def detect_possibly_undefined_vars(self) -> None: | |
| if manager.errors.is_error_code_enabled( | ||
| codes.POSSIBLY_UNDEFINED | ||
| ) or manager.errors.is_error_code_enabled(codes.USED_BEFORE_DEF): | ||
| self.tree.accept( | ||
| PossiblyUndefinedVariableVisitor( | ||
| MessageBuilder(manager.errors, manager.modules), | ||
| self.type_map(), | ||
| self.options, | ||
| self.tree.names, | ||
| with self.wrap_context(): | ||
| self.tree.accept( | ||
| PossiblyUndefinedVariableVisitor( | ||
| MessageBuilder(manager.errors, manager.modules), | ||
| self.type_map(), | ||
| self.options, | ||
| self.tree.names, | ||
| ) | ||
| ) | ||
| ) | ||
|
|
||
| def finish_passes(self) -> None: | ||
| assert self.tree is not None, "Internal error: method must be called on parsed file only" | ||
|
|
@@ -3304,14 +3367,16 @@ def generate_unused_ignore_notes(self) -> None: | |
| if self.meta and self.options.fine_grained_incremental: | ||
| self.verify_dependencies(suppressed_only=True) | ||
| is_typeshed = self.tree is not None and self.tree.is_typeshed_file(self.options) | ||
| self.manager.errors.generate_unused_ignore_errors(self.xpath, is_typeshed) | ||
| with self.wrap_context(): | ||
| self.manager.errors.generate_unused_ignore_errors(self.xpath, is_typeshed) | ||
|
|
||
| def generate_ignore_without_code_notes(self) -> None: | ||
| if self.manager.errors.is_error_code_enabled(codes.IGNORE_WITHOUT_CODE): | ||
| is_typeshed = self.tree is not None and self.tree.is_typeshed_file(self.options) | ||
| self.manager.errors.generate_ignore_without_code_errors( | ||
| self.xpath, self.options.warn_unused_ignores, is_typeshed | ||
| ) | ||
| with self.wrap_context(): | ||
| self.manager.errors.generate_ignore_without_code_errors( | ||
| self.xpath, self.options.warn_unused_ignores, is_typeshed | ||
| ) | ||
|
|
||
|
|
||
| # Module import and diagnostic glue | ||
|
|
@@ -3616,12 +3681,14 @@ def skipping_ancestor(manager: BuildManager, id: str, path: str, ancestor_for: S | |
| # immediately if it's empty or only contains comments. | ||
| # But beware, some package may be the ancestor of many modules, | ||
| # so we'd need to cache the decision. | ||
| save_import_context = manager.errors.import_context() | ||
| manager.errors.set_import_context([]) | ||
| manager.errors.set_file(ancestor_for.xpath, ancestor_for.id, manager.options) | ||
| manager.error(None, f'Ancestor package "{id}" ignored', only_once=True) | ||
| manager.note( | ||
| None, "(Using --follow-imports=error, submodule passed on command line)", only_once=True | ||
| ) | ||
| manager.errors.set_import_context(save_import_context) | ||
|
|
||
|
|
||
| def log_configuration(manager: BuildManager, sources: list[BuildSource]) -> None: | ||
|
|
@@ -3909,14 +3976,23 @@ def load_graph( | |
| graph[st.id] = st | ||
| new.append(st) | ||
| entry_points.add(bs.module) | ||
| manager.parse_all([state for state in new if state.needs_parse]) | ||
|
|
||
| # Note: Running this each time could be slow in the daemon. If it's a problem, we | ||
| # can do more work to maintain this incrementally. | ||
| seen_files = {st.abspath: st for st in graph.values() if st.path} | ||
|
|
||
| # Collect dependencies. We go breadth-first. | ||
| # More nodes might get added to new as we go, but that's fine. | ||
| ready = set(new) | ||
| not_ready: set[State] = set() | ||
| for st in new: | ||
| if st not in ready: | ||
| # We have run out of states, parse all we have. | ||
| assert st in not_ready | ||
| manager.parse_all(not_ready) | ||
| ready |= not_ready | ||
| not_ready.clear() | ||
| assert st.ancestors is not None | ||
| # Strip out indirect dependencies. These will be dealt with | ||
| # when they show up as direct dependencies, and there's a | ||
|
|
@@ -3972,6 +4048,7 @@ def load_graph( | |
| newst_path = newst.abspath | ||
|
|
||
| if newst_path in seen_files: | ||
| manager.errors.set_file(newst.xpath, newst.id, manager.options) | ||
| manager.error( | ||
| None, | ||
| "Source file found twice under different module names: " | ||
|
|
@@ -3992,6 +4069,10 @@ def load_graph( | |
| assert newst.id not in graph, newst.id | ||
| graph[newst.id] = newst | ||
| new.append(newst) | ||
| if newst.needs_parse: | ||
| not_ready.add(newst) | ||
| else: | ||
| ready.add(newst) | ||
| # There are two things we need to do after the initial load loop. One is up-suppress | ||
| # modules that are back in graph. We need to do this after the loop to cover edge cases | ||
| # like where a namespace package ancestor is shared by a typed and an untyped package. | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Iterableis somewhat questionable as a type, since we iterate overstatesmore than once.Container[State]could be a slightly better type.