Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions changelog.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,12 @@
Upcoming (TBD)
==============

Features
---------
* Add option to prefetch completion metadata for some or all schemas
* Save fetched completion metadata when switching schemas


1.69.0 (2026/04/20)
==============

Expand Down
5 changes: 5 additions & 0 deletions mycli/clitoolbar.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,11 @@ def get_toolbar_tokens() -> list[tuple[str, str]]:
dynamic.append(divider)
dynamic.append(("class:bottom-toolbar", "Refreshing completions…"))

schema_prefetcher = getattr(mycli, 'schema_prefetcher', None)
if schema_prefetcher is not None and schema_prefetcher.is_prefetching():
dynamic.append(divider)
dynamic.append(("class:bottom-toolbar", "Prefetching schemas…"))

if format_string and format_string != r'\B':
if format_string.startswith(r'\B'):
amended_format = format_string[2:]
Expand Down
22 changes: 20 additions & 2 deletions mycli/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@
from mycli.packages.sqlresult import SQLResult
from mycli.packages.ssh_utils import read_ssh_config
from mycli.packages.tabular_output import sql_format
from mycli.schema_prefetcher import SchemaPrefetcher
from mycli.sqlcompleter import SQLCompleter
from mycli.sqlexecute import FIELD_TYPES, SQLExecute
from mycli.types import Query
Expand Down Expand Up @@ -243,6 +244,8 @@ def __init__(
self.logfile = False

self.completion_refresher = CompletionRefresher()
self.prefetch_schemas_setting = c["main"].get("prefetch_schemas", "") or ""
self.schema_prefetcher = SchemaPrefetcher(self)

self.logger = logging.getLogger(__name__)
self.initialize_logging()
Expand Down Expand Up @@ -301,6 +304,8 @@ def __init__(
special.set_destructive_keywords(self.destructive_keywords)

def close(self) -> None:
if hasattr(self, 'schema_prefetcher'):
self.schema_prefetcher.stop()
if self.sqlexecute is not None:
self.sqlexecute.close()

Expand Down Expand Up @@ -1008,10 +1013,18 @@ def configure_pager(self) -> None:
special.disable_pager()

def refresh_completions(self, reset: bool = False) -> list[SQLResult]:
# Cancel any in-flight schema prefetch before the completer is
# replaced. Loaded-schema bookkeeping is intentionally preserved
# so switching between already-loaded schemas does not re-fetch.
self.schema_prefetcher.stop()

assert self.sqlexecute is not None
if reset:
# Update the active completer's current-schema pointer right
# away so unqualified completions reflect a schema switch
# even before the background refresh finishes.
with self._completer_lock:
self.completer.reset_completions()
assert self.sqlexecute is not None
self.completer.set_dbname(self.sqlexecute.dbname)
self.completion_refresher.refresh(
self.sqlexecute,
self._on_completions_refreshed,
Expand All @@ -1027,13 +1040,18 @@ def refresh_completions(self, reset: bool = False) -> list[SQLResult]:
def _on_completions_refreshed(self, new_completer: SQLCompleter) -> None:
"""Swap the completer object in cli with the newly created completer."""
with self._completer_lock:
new_completer.copy_other_schemas_from(self.completer, exclude=new_completer.dbname)
self.completer = new_completer

if self.prompt_session:
# After refreshing, redraw the CLI to clear the statusbar
# "Refreshing completions..." indicator
self.prompt_session.app.invalidate()

# Kick off background prefetch for any extra schemas configured
# via ``prefetch_schemas`` so users get cross-schema completions.
self.schema_prefetcher.start_configured()

def run_query(
self,
query: str,
Expand Down
8 changes: 8 additions & 0 deletions mycli/myclirc
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,14 @@ smart_completion = True
# Suggestion: 3.
min_completion_trigger = 1

# Comma-separated list of schemas to prefetch for auto-completion.
# Prefetching starts in the background after launch.
# Examples:
# (empty) = default, disables prefetching
# schema1,schema2 = enables prefetch for given schemas
# all = enables prefetch for all schemas
prefetch_schemas =

# Multi-line mode allows breaking up the sql statements into multiple lines. If
# this is set to True, then the end of the statements must have a semi-colon.
# If this is set to False then sql statements can't be split into multiple
Expand Down
232 changes: 232 additions & 0 deletions mycli/schema_prefetcher.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,232 @@
"""Background prefetcher for multi-schema auto-completion.

The default completion refresher only populates metadata for the
currently-selected schema. ``SchemaPrefetcher`` loads metadata for
additional schemas on a background thread so that users can get
qualified auto-completion suggestions (``OtherSchema.table``) without
switching databases first.
"""

from __future__ import annotations

import logging
import threading
from typing import TYPE_CHECKING, Any, Iterable

from mycli.sqlexecute import SQLExecute

if TYPE_CHECKING: # pragma: no cover - typing only
from mycli.main import MyCli
from mycli.sqlcompleter import SQLCompleter

_logger = logging.getLogger(__name__)

ALL_SCHEMAS_SENTINEL = 'all'


def parse_prefetch_setting(raw: str | None) -> list[str] | None:
"""Parse the ``prefetch_schemas`` option value.

Returns ``None`` when the user wants every accessible schema
(``all``), a list of explicit schema names otherwise, or an empty
list when prefetching is disabled.
"""
if not raw:
return []
value = raw.strip()
if not value:
return []
if value.lower() == ALL_SCHEMAS_SENTINEL:
return None
return [part.strip() for part in value.split(',') if part.strip()]


class SchemaPrefetcher:
"""Run schema prefetch work on a dedicated background thread."""

def __init__(self, mycli: 'MyCli') -> None:
self.mycli = mycli
self._thread: threading.Thread | None = None
self._cancel = threading.Event()
self._loaded: set[str] = set()

def is_prefetching(self) -> bool:
return bool(self._thread and self._thread.is_alive())

def clear_loaded(self) -> None:
"""Forget which schemas have been prefetched (used on reset)."""
self._loaded.clear()

def stop(self, timeout: float = 2.0) -> None:
"""Signal the background thread to stop and wait briefly for it."""
if self._thread and self._thread.is_alive():
self._cancel.set()
self._thread.join(timeout=timeout)
self._cancel = threading.Event()
self._thread = None

def start_configured(self) -> None:
"""Start prefetching based on the user's ``prefetch_schemas`` setting."""
setting = getattr(self.mycli, 'prefetch_schemas_setting', '')
parsed = parse_prefetch_setting(setting)
if parsed is None:
self._start(self._resolve_all_schemas())
else:
self._start(parsed)

def prefetch_schema_now(self, schema: str) -> None:
"""Fetch *schema* immediately on a background thread.

Used when a user manually switches to a schema. The method
returns quickly; the actual work happens in the new thread.
"""
if not schema:
return
# Avoid double-fetching while a full-prefetch pass is running.
self.stop()
self._start([schema])

def _start(self, schemas: Iterable[str]) -> None:
self.stop()
current = self._current_schema()
existing = set(self.mycli.completer.dbmetadata.get('tables', {}).keys())
queue = [s for s in schemas if s and s != current and s not in self._loaded and s not in existing]
if not queue:
self._invalidate_app()
return
self._cancel = threading.Event()
self._thread = threading.Thread(
target=self._run,
args=(queue,),
name='schema_prefetcher',
daemon=True,
)
self._thread.start()
self._invalidate_app()

def _run(self, schemas: list[str]) -> None:
executor: SQLExecute | None = None
try:
executor = self._make_executor()
except Exception as e: # pragma: no cover - defensive
_logger.error('schema prefetch could not open connection: %r', e)
self._invalidate_app()
return
try:
for schema in schemas:
if self._cancel.is_set():
return
try:
self._prefetch_one(executor, schema)
self._loaded.add(schema)
except Exception as e:
_logger.error('prefetch failed for schema %r: %r', schema, e)
finally:
try:
executor.close()
except Exception: # pragma: no cover - defensive
pass
self._invalidate_app()

def _prefetch_one(self, executor: SQLExecute, schema: str) -> None:
_logger.debug('prefetching schema %r', schema)
table_rows = list(executor.table_columns(schema=schema))
fk_rows = list(executor.foreign_keys(schema=schema))
enum_rows = list(executor.enum_values(schema=schema))
func_rows = list(executor.functions(schema=schema))
proc_rows = list(executor.procedures(schema=schema))

# Use the live completer's escape logic so keys match what the
# completion engine computes when parsing user input.
completer = self.mycli.completer
table_columns: dict[str, list[str]] = {}
for table, column in table_rows:
esc_table = completer.escape_name(table)
esc_col = completer.escape_name(column)
cols = table_columns.setdefault(esc_table, ['*'])
cols.append(esc_col)

fk_tables: dict[str, set[str]] = {}
fk_relations: list[tuple[str, str, str, str]] = []
for table, col, ref_table, ref_col in fk_rows:
esc_table = completer.escape_name(table)
esc_col = completer.escape_name(col)
esc_ref_table = completer.escape_name(ref_table)
esc_ref_col = completer.escape_name(ref_col)
fk_tables.setdefault(esc_table, set()).add(esc_ref_table)
fk_tables.setdefault(esc_ref_table, set()).add(esc_table)
fk_relations.append((esc_table, esc_col, esc_ref_table, esc_ref_col))
fk_payload: dict[str, Any] = {'tables': fk_tables, 'relations': fk_relations}

enum_values: dict[str, dict[str, list[str]]] = {}
for table, column, values in enum_rows:
esc_table = completer.escape_name(table)
esc_col = completer.escape_name(column)
enum_values.setdefault(esc_table, {})[esc_col] = list(values)

functions: dict[str, None] = {}
for row in func_rows:
if not row or not row[0]:
continue
functions[completer.escape_name(row[0])] = None

procedures: dict[str, None] = {}
for row in proc_rows:
if not row or not row[0]:
continue
procedures[completer.escape_name(row[0])] = None

with self.mycli._completer_lock:
live_completer: 'SQLCompleter' = self.mycli.completer
live_completer.load_schema_metadata(
schema=schema,
table_columns=table_columns,
foreign_keys=fk_payload,
enum_values=enum_values,
functions=functions,
procedures=procedures,
)
self._invalidate_app()

def _resolve_all_schemas(self) -> list[str]:
sqlexecute = self.mycli.sqlexecute
if sqlexecute is None:
return []
try:
return list(sqlexecute.databases())
except Exception as e:
_logger.error('failed to list databases for prefetch: %r', e)
return []

def _current_schema(self) -> str | None:
sqlexecute = self.mycli.sqlexecute
return sqlexecute.dbname if sqlexecute is not None else None

def _make_executor(self) -> SQLExecute:
sqlexecute = self.mycli.sqlexecute
assert sqlexecute is not None
return SQLExecute(
sqlexecute.dbname,
sqlexecute.user,
sqlexecute.password,
sqlexecute.host,
sqlexecute.port,
sqlexecute.socket,
sqlexecute.character_set,
sqlexecute.local_infile,
sqlexecute.ssl,
sqlexecute.ssh_user,
sqlexecute.ssh_host,
sqlexecute.ssh_port,
sqlexecute.ssh_password,
sqlexecute.ssh_key_filename,
)

def _invalidate_app(self) -> None:
prompt_session = getattr(self.mycli, 'prompt_session', None)
if prompt_session is None:
return
try:
prompt_session.app.invalidate()
except Exception: # pragma: no cover - defensive
pass
Loading
Loading