Skip to content
200 changes: 141 additions & 59 deletions mypy/build.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@
import sys
import time
import types
from collections.abc import Callable, Iterator, Mapping, Sequence, Set as AbstractSet
from collections.abc import Callable, Iterable, Iterator, Mapping, Sequence, Set as AbstractSet
from concurrent.futures import FIRST_EXCEPTION, ThreadPoolExecutor, wait
from heapq import heappop, heappush
from textwrap import dedent
from typing import (
Expand Down Expand Up @@ -125,6 +126,7 @@
from mypy.util import (
DecodeError,
decode_python_encoding,
get_available_threads,
get_mypy_comments,
hash_digest,
hash_digest_bytes,
Expand Down Expand Up @@ -947,6 +949,61 @@ def dump_stats(self) -> None:
for key, value in sorted(self.stats_summary().items()):
print(f"{key + ':':24}{value}")

def parse_all(self, states: Iterable[State]) -> None:
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Iterable is somewhat questionable as a type, since we iterate over states more than once. Container[State] could be a slightly better type.

"""Parse multiple files in parallel (if possible) and compute dependencies.

Note: this duplicates a bit of logic from State.parse_file(). This is done
as a micro-optimization to parallelize only those parts of the code that
can be parallelized efficiently.
"""
if self.options.native_parser:
futures = []
parsed_states = set()
# Use at least --num-workers if specified by user.
available_threads = max(get_available_threads(), self.options.num_workers)
# Overhead from trying to parallelize (small) blocking portion of
# parse_file_inner() results in no visible improvement with more than 8 threads.
with ThreadPoolExecutor(max_workers=min(available_threads, 8)) as executor:
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reusing the ThreadPoolExecutor could also help scaling a bit, based on some quick microbenchmarks (this is a potential future improvement).

for state in states:
state.needs_parse = False
if state.tree is not None:
# The file was already parsed.
continue
# New parser reads source from file directly, we do this only for
# the side effect of parsing inline mypy configurations.
state.get_source()
if state.id not in self.ast_cache:
futures.append(executor.submit(state.parse_file_inner, state.source or ""))
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another potential future improvement/experiment: sending multiple small files in a single submit call could improve scaling a bit, but likely only for small files or if the batch is large enough.

parsed_states.add(state)
else:
self.log(f"Using cached AST for {state.xpath} ({state.id})")
state.tree, state.early_errors = self.ast_cache[state.id]
for fut in wait(futures, return_when=FIRST_EXCEPTION).done:
# This will raise exceptions, if any.
fut.result()

for state in states:
assert state.tree is not None
if state in parsed_states:
state.early_errors = list(self.errors.error_info_map.get(state.xpath, []))
state.semantic_analysis_pass1()
self.ast_cache[state.id] = (state.tree, state.early_errors)
self.modules[state.id] = state.tree
state.check_blockers()
state.setup_errors()
else:
# Old parser cannot be parallelized.
for state in states:
state.parse_file()

for state in states:
state.compute_dependencies()
if self.workers and state.tree:
# We don't need imports in coordinator process anymore, we parse only to
# compute dependencies.
state.tree.imports = []
del self.ast_cache[state.id]

def use_fine_grained_cache(self) -> bool:
return self.cache_enabled and self.options.use_fine_grained_cache

Expand Down Expand Up @@ -2502,8 +2559,7 @@ def new_state(
# we need to re-calculate dependencies.
# NOTE: see comment below for why we skip this in fine-grained mode.
if exist_added_packages(suppressed, manager):
state.parse_file() # This is safe because the cache is anyway stale.
state.compute_dependencies()
state.needs_parse = True # This is safe because the cache is anyway stale.
# This is an inverse to the situation above. If we had an import like this:
# from pkg import mod
# and then mod was deleted, we need to force recompute dependencies, to
Expand All @@ -2512,8 +2568,7 @@ def new_state(
# import pkg
# import pkg.mod
if exist_removed_submodules(dependencies, manager):
state.parse_file() # Same as above, the current state is stale anyway.
state.compute_dependencies()
state.needs_parse = True # Same as above, the current state is stale anyway.
state.size_hint = meta.size
else:
# When doing a fine-grained cache load, pretend we only
Expand All @@ -2523,14 +2578,17 @@ def new_state(
manager.log(f"Deferring module to fine-grained update {path} ({id})")
raise ModuleNotFound

# Parse the file (and then some) to get the dependencies.
state.parse_file(temporary=temporary)
state.compute_dependencies()
if manager.workers and state.tree:
# We don't need imports in coordinator process anymore, we parse only to
# compute dependencies.
state.tree.imports = []
del manager.ast_cache[id]
if temporary:
# Eagerly parse temporary states, they are needed rarely.
state.parse_file(temporary=True)
state.compute_dependencies()
if state.manager.workers and state.tree:
# We don't need imports in coordinator process anymore, we parse only to
# compute dependencies.
state.tree.imports = []
del state.manager.ast_cache[state.id]
else:
state.needs_parse = True

return state

Expand Down Expand Up @@ -2593,6 +2651,8 @@ def __init__(
# Pre-computed opaque value of suppressed_deps_opts() used
# to minimize amount of data sent to parallel workers.
self.known_suppressed_deps_opts: bytes | None = None
# An internal flag used by build manager to schedule states for parsing.
self.needs_parse = False

def write(self, buf: WriteBuffer) -> None:
"""Serialize State for sending to build worker.
Expand Down Expand Up @@ -2816,26 +2876,9 @@ def fix_cross_refs(self) -> None:

# Methods for processing modules from source code.

def parse_file(self, *, temporary: bool = False, raw_data: FileRawData | None = None) -> None:
"""Parse file and run first pass of semantic analysis.

Everything done here is local to the file. Don't depend on imported
modules in any way. Also record module dependencies based on imports.
"""
if self.tree is not None:
# The file was already parsed (in __init__()).
return

def get_source(self) -> str:
"""Get module source and parse inline mypy configurations."""
manager = self.manager

# Can we reuse a previously parsed AST? This avoids redundant work in daemon.
cached = self.id in manager.ast_cache
modules = manager.modules
if not cached:
manager.log(f"Parsing {self.xpath} ({self.id})")
else:
manager.log(f"Using cached AST for {self.xpath} ({self.id})")

t0 = time_ref()

with self.wrap_context():
Expand Down Expand Up @@ -2877,33 +2920,53 @@ def parse_file(self, *, temporary: bool = False, raw_data: FileRawData | None =
self.check_for_invalid_options()

self.size_hint = len(source)
if not cached:
ignore_errors = self.ignore_all or self.options.ignore_errors
self.tree = manager.parse_file(
self.id,
self.xpath,
source,
ignore_errors=ignore_errors,
options=self.options,
raw_data=raw_data,
)
else:
# Reuse a cached AST
self.tree = manager.ast_cache[self.id][0]
self.time_spent_us += time_spent_us(t0)
return source

def parse_file_inner(self, source: str, raw_data: FileRawData | None = None) -> None:
t0 = time_ref()
self.manager.log(f"Parsing {self.xpath} ({self.id})")
with self.wrap_context():
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Error context and the Errors object seems to be shared between threads, so there is a race condition. We could perhaps use a separate Errors per thread as a quick fix.

A more efficient approach would be to have parse just return a list of error descriptions, and then the main thread would report any errors via the shared Errors instance. This way we wouldn't need to set up error context for each file unless there are errors (which is rare), so the sequential part would do less work, possibly increasing the maximum possible concurrency.

It's okay to do something simple at first. We can always improve it later, but it would be good to fix race conditions before merging.

Also in BuildManager.parse_file, the add_stats call is racy. In this case we could add a stats_enabled check to quickly skip stats reporting most of the time (again reducing sequential part), and then use a lock around stats updates if stats are enabled.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, indeed there are race conditions. But IIUC the extra wrap_context() calls I added are needed anyway. Without those we can get incorrect error ordering even in sequential mode in some edge cases.

ignore_errors = self.ignore_all or self.options.ignore_errors
self.tree = self.manager.parse_file(
self.id,
self.xpath,
source,
ignore_errors=ignore_errors,
options=self.options,
raw_data=raw_data,
)
self.time_spent_us += time_spent_us(t0)

if not cached:
def parse_file(self, *, temporary: bool = False, raw_data: FileRawData | None = None) -> None:
"""Parse file and run first pass of semantic analysis.

Everything done here is local to the file. Don't depend on imported
modules in any way. Logic here should be kept in sync with BuildManager.parse_all().
"""
self.needs_parse = False
if self.tree is not None:
# The file was already parsed.
return

source = self.get_source()
manager = self.manager
# Can we reuse a previously parsed AST? This avoids redundant work in daemon.
if self.id not in manager.ast_cache:
self.parse_file_inner(source, raw_data)
# Make a copy of any errors produced during parse time so that
# fine-grained mode can repeat them when the module is
# reprocessed.
self.early_errors = list(manager.errors.error_info_map.get(self.xpath, []))
self.semantic_analysis_pass1()
else:
self.early_errors = manager.ast_cache[self.id][1]
# Reuse a cached AST
manager.log(f"Using cached AST for {self.xpath} ({self.id})")
self.tree, self.early_errors = manager.ast_cache[self.id]

assert self.tree is not None
if not temporary:
modules[self.id] = self.tree
manager.modules[self.id] = self.tree
self.check_blockers()

manager.ast_cache[self.id] = (self.tree, self.early_errors)
Expand Down Expand Up @@ -3075,14 +3138,15 @@ def detect_possibly_undefined_vars(self) -> None:
if manager.errors.is_error_code_enabled(
codes.POSSIBLY_UNDEFINED
) or manager.errors.is_error_code_enabled(codes.USED_BEFORE_DEF):
self.tree.accept(
PossiblyUndefinedVariableVisitor(
MessageBuilder(manager.errors, manager.modules),
self.type_map(),
self.options,
self.tree.names,
with self.wrap_context():
self.tree.accept(
PossiblyUndefinedVariableVisitor(
MessageBuilder(manager.errors, manager.modules),
self.type_map(),
self.options,
self.tree.names,
)
)
)

def finish_passes(self) -> None:
assert self.tree is not None, "Internal error: method must be called on parsed file only"
Expand Down Expand Up @@ -3304,14 +3368,16 @@ def generate_unused_ignore_notes(self) -> None:
if self.meta and self.options.fine_grained_incremental:
self.verify_dependencies(suppressed_only=True)
is_typeshed = self.tree is not None and self.tree.is_typeshed_file(self.options)
self.manager.errors.generate_unused_ignore_errors(self.xpath, is_typeshed)
with self.wrap_context():
self.manager.errors.generate_unused_ignore_errors(self.xpath, is_typeshed)

def generate_ignore_without_code_notes(self) -> None:
if self.manager.errors.is_error_code_enabled(codes.IGNORE_WITHOUT_CODE):
is_typeshed = self.tree is not None and self.tree.is_typeshed_file(self.options)
self.manager.errors.generate_ignore_without_code_errors(
self.xpath, self.options.warn_unused_ignores, is_typeshed
)
with self.wrap_context():
self.manager.errors.generate_ignore_without_code_errors(
self.xpath, self.options.warn_unused_ignores, is_typeshed
)


# Module import and diagnostic glue
Expand Down Expand Up @@ -3616,12 +3682,14 @@ def skipping_ancestor(manager: BuildManager, id: str, path: str, ancestor_for: S
# immediately if it's empty or only contains comments.
# But beware, some package may be the ancestor of many modules,
# so we'd need to cache the decision.
save_import_context = manager.errors.import_context()
manager.errors.set_import_context([])
manager.errors.set_file(ancestor_for.xpath, ancestor_for.id, manager.options)
manager.error(None, f'Ancestor package "{id}" ignored', only_once=True)
manager.note(
None, "(Using --follow-imports=error, submodule passed on command line)", only_once=True
)
manager.errors.set_import_context(save_import_context)


def log_configuration(manager: BuildManager, sources: list[BuildSource]) -> None:
Expand Down Expand Up @@ -3909,14 +3977,23 @@ def load_graph(
graph[st.id] = st
new.append(st)
entry_points.add(bs.module)
manager.parse_all([state for state in new if state.needs_parse])

# Note: Running this each time could be slow in the daemon. If it's a problem, we
# can do more work to maintain this incrementally.
seen_files = {st.abspath: st for st in graph.values() if st.path}

# Collect dependencies. We go breadth-first.
# More nodes might get added to new as we go, but that's fine.
ready = set(new)
not_ready: set[State] = set()
for st in new:
if st not in ready:
# We have run out of states, parse all we have.
assert st in not_ready
manager.parse_all(not_ready)
ready |= not_ready
not_ready.clear()
assert st.ancestors is not None
# Strip out indirect dependencies. These will be dealt with
# when they show up as direct dependencies, and there's a
Expand Down Expand Up @@ -3972,6 +4049,7 @@ def load_graph(
newst_path = newst.abspath

if newst_path in seen_files:
manager.errors.set_file(newst.xpath, newst.id, manager.options)
manager.error(
None,
"Source file found twice under different module names: "
Expand All @@ -3992,6 +4070,10 @@ def load_graph(
assert newst.id not in graph, newst.id
graph[newst.id] = newst
new.append(newst)
if newst.needs_parse:
not_ready.add(newst)
else:
ready.add(newst)
# There are two things we need to do after the initial load loop. One is up-suppress
# modules that are back in graph. We need to do this after the loop to cover edge cases
# like where a namespace package ancestor is shared by a typed and an untyped package.
Expand Down
2 changes: 1 addition & 1 deletion mypy/metastore.py
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ def close(self) -> None:
def connect_db(db_file: str) -> sqlite3.Connection:
import sqlite3.dbapi2

db = sqlite3.dbapi2.connect(db_file)
db = sqlite3.dbapi2.connect(db_file, check_same_thread=False)
# This is a bit unfortunate (as we may get corrupt cache after e.g. Ctrl + C),
# but without this flag, commits are *very* slow, especially when using HDDs,
# see https://www.sqlite.org/faq.html#q19 for details.
Expand Down
5 changes: 5 additions & 0 deletions mypy/nativeparse.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
from __future__ import annotations

import os
import time
from typing import Any, Final, cast

import ast_serialize # type: ignore[import-untyped, import-not-found, unused-ignore]
Expand Down Expand Up @@ -273,6 +274,10 @@ def read_statements(state: State, data: ReadBuffer, n: int) -> list[Statement]:
def parse_to_binary_ast(
filename: str, options: Options, skip_function_bodies: bool = False
) -> tuple[bytes, list[dict[str, Any]], TypeIgnores, bytes, bool, bool]:
# This is a horrible hack to work around a mypyc bug where imported
# module may be not ready in a thread sometimes.
while ast_serialize is None:
time.sleep(0.0001) # type: ignore[unreachable]
ast_bytes, errors, ignores, import_bytes, ast_data = ast_serialize.parse(
filename,
skip_function_bodies=skip_function_bodies,
Expand Down
Loading
Loading