diff --git a/docs/en/antalya/swarm.md b/docs/en/antalya/swarm.md
new file mode 100644
index 000000000000..a26f9de26e0a
--- /dev/null
+++ b/docs/en/antalya/swarm.md
@@ -0,0 +1,73 @@
+# Antalya branch
+
+## Swarm
+
+### Difference with upstream version
+
+#### `storage_type` argument in object storage functions
+
+In upstream ClickHouse, there are several table functions to read Iceberg tables from different storage backends such as `icebergLocal`, `icebergS3`, `icebergAzure`, `icebergHDFS`, cluster variants, the `iceberg` function as a synonym for `icebergS3`, and table engines like `IcebergLocal`, `IcebergS3`, `IcebergAzure`, `IcebergHDFS`.
+
+In the Antalya branch, the `iceberg` table function and the `Iceberg` table engine unify all variants into one by using a new named argument, `storage_type`, which can be one of `local`, `s3`, `azure`, or `hdfs`.
+
+Old syntax examples:
+
+```sql
+SELECT * FROM icebergS3('http://minio1:9000/root/table_data', 'minio', 'minio123', 'Parquet');
+SELECT * FROM icebergAzureCluster('mycluster', 'http://azurite1:30000/devstoreaccount1', 'cont', '/table_data', 'devstoreaccount1', 'Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==', 'Parquet');
+CREATE TABLE mytable ENGINE=IcebergHDFS('/table_data', 'Parquet');
+```
+
+New syntax examples:
+
+```sql
+SELECT * FROM iceberg(storage_type='s3', 'http://minio1:9000/root/table_data', 'minio', 'minio123', 'Parquet');
+SELECT * FROM icebergCluster('mycluster', storage_type='azure', 'http://azurite1:30000/devstoreaccount1', 'cont', '/table_data', 'devstoreaccount1', 'Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==', 'Parquet');
+CREATE TABLE mytable ENGINE=Iceberg('/table_data', 'Parquet', storage_type='hdfs');
+```
+
+Also, if a named collection is used to store access parameters, the field `storage_type` can be included in the same named collection:
+
+```xml
+
+
+ http://minio1:9001/root/
+ minio
+ minio123
+ s3
+
+
+```
+
+```sql
+SELECT * FROM iceberg(s3, filename='table_data');
+```
+
+By default `storage_type` is `'s3'` to maintain backward compatibility.
+
+
+#### `object_storage_cluster` setting
+
+The new setting `object_storage_cluster` controls whether a single-node or cluster variant of table functions reading from object storage (e.g., `s3`, `azure`, `iceberg`, and their cluster variants like `s3Cluster`, `azureCluster`, `icebergCluster`) is used.
+
+Old syntax examples:
+
+```sql
+SELECT * from s3Cluster('myCluster', 'http://minio1:9001/root/data/{clickhouse,database}/*', 'minio', 'minio123', 'CSV',
+ 'name String, value UInt32, polygon Array(Array(Tuple(Float64, Float64)))');
+SELECT * FROM icebergAzureCluster('mycluster', 'http://azurite1:30000/devstoreaccount1', 'cont', '/table_data', 'devstoreaccount1', 'Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==', 'Parquet');
+```
+
+New syntax examples:
+
+```sql
+SELECT * from s3('http://minio1:9001/root/data/{clickhouse,database}/*', 'minio', 'minio123', 'CSV',
+ 'name String, value UInt32, polygon Array(Array(Tuple(Float64, Float64)))')
+ SETTINGS object_storage_cluster='myCluster';
+SELECT * FROM icebergAzure('http://azurite1:30000/devstoreaccount1', 'cont', '/table_data', 'devstoreaccount1', 'Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==', 'Parquet')
+ SETTINGS object_storage_cluster='myCluster';
+```
+
+This setting also applies to table engines and can be used with tables managed by Iceberg Catalog.
+
+Note: The upstream ClickHouse has introduced analogous settings, such as `parallel_replicas_for_cluster_engines` and `cluster_for_parallel_replicas`. Since version 25.10, these settings work with table engines. It is possible that in the future, the `object_storage_cluster` setting will be deprecated.
diff --git a/docs/en/engines/table-engines/integrations/iceberg.md b/docs/en/engines/table-engines/integrations/iceberg.md
index af50e1099e4d..532d2f0eef31 100644
--- a/docs/en/engines/table-engines/integrations/iceberg.md
+++ b/docs/en/engines/table-engines/integrations/iceberg.md
@@ -324,6 +324,62 @@ SETTINGS iceberg_metadata_staleness_ms=120000
**Note**: Current expectation is that metadata cache size is sufficient to hold the latest metadata snapshot in full for all active tables, if asynchronous prefetching is enabled.
+## Altinity Antalya branch
+
+### Specify storage type in arguments
+
+Only in the Altinity Antalya branch does `Iceberg` table engine support all storage types. The storage type can be specified using the named argument `storage_type`. Supported values are `s3`, `azure`, `hdfs`, and `local`.
+
+```sql
+CREATE TABLE iceberg_table_s3
+ ENGINE = Iceberg(storage_type='s3', url, [, NOSIGN | access_key_id, secret_access_key, [session_token]], format, [,compression])
+
+CREATE TABLE iceberg_table_azure
+ ENGINE = Iceberg(storage_type='azure', connection_string|storage_account_url, container_name, blobpath, [account_name, account_key, format, compression])
+
+CREATE TABLE iceberg_table_hdfs
+ ENGINE = Iceberg(storage_type='hdfs', path_to_table, [,format] [,compression_method])
+
+CREATE TABLE iceberg_table_local
+ ENGINE = Iceberg(storage_type='local', path_to_table, [,format] [,compression_method])
+```
+
+### Specify storage type in named collection
+
+Only in Altinity Antalya branch `storage_type` can be included as part of a named collection. This allows for centralized configuration of storage settings.
+
+```xml
+
+
+
+ http://test.s3.amazonaws.com/clickhouse-bucket/
+ test
+ test
+ auto
+ auto
+ s3
+
+
+
+```
+
+```sql
+CREATE TABLE iceberg_table ENGINE=Iceberg(iceberg_conf, filename = 'test_table')
+```
+
+The default value for `storage_type` is `s3`.
+
+### The `object_storage_cluster` setting.
+
+Only in the Altinity Antalya branch is an alternative syntax for the `Iceberg` table engine available. This syntax allows execution on a cluster when the `object_storage_cluster` setting is non-empty and contains the cluster name.
+
+```sql
+CREATE TABLE iceberg_table_s3
+ ENGINE = Iceberg(storage_type='s3', url, [, NOSIGN | access_key_id, secret_access_key, [session_token]], format, [,compression]);
+
+SELECT * FROM iceberg_table_s3 SETTINGS object_storage_cluster='cluster_simple';
+```
+
## See also {#see-also}
- [iceberg table function](/sql-reference/table-functions/iceberg.md)
diff --git a/docs/en/sql-reference/distribution-on-cluster.md b/docs/en/sql-reference/distribution-on-cluster.md
new file mode 100644
index 000000000000..3a9835e23856
--- /dev/null
+++ b/docs/en/sql-reference/distribution-on-cluster.md
@@ -0,0 +1,23 @@
+# Task distribution in *Cluster family functions
+
+## Task distribution algorithm
+
+Table functions such as `s3Cluster`, `azureBlobStorageCluster`, `hdsfCluster`, `icebergCluster`, and table engines like `S3`, `Azure`, `HDFS`, `Iceberg` with the setting `object_storage_cluster` distribute tasks across all cluster nodes or a subset limited by the `object_storage_max_nodes` setting. This setting limits the number of nodes involved in processing a distributed query, randomly selecting nodes for each query.
+
+A single task corresponds to processing one source file.
+
+For each file, one cluster node is selected as the primary node using a consistent Rendezvous Hashing algorithm. This algorithm guarantees that:
+ * The same node is consistently selected as primary for each file, as long as the cluster remains unchanged.
+ * When the cluster changes (nodes added or removed), only files assigned to those affected nodes change their primary node assignment.
+
+This improves cache efficiency by minimizing data movement among nodes.
+
+## `lock_object_storage_task_distribution_ms` setting
+
+Each node begins processing files for which it is the primary node. After completing its assigned files, a node may take tasks from other nodes, either immediately or after waiting for `lock_object_storage_task_distribution_ms` milliseconds if the primary node does not request new files during that interval. The default value of `lock_object_storage_task_distribution_ms` is 500 milliseconds. This setting balances between caching efficiency and workload redistribution when nodes are imbalanced.
+
+## `SYSTEM STOP SWARM MODE` command
+
+If a node needs to shut down gracefully, the command `SYSTEM STOP SWARM MODE` prevents the node from receiving new tasks for *Cluster-family queries. The node finishes processing already assigned files before it can safely shut down without errors.
+
+Receiving new tasks can be resumed with the command `SYSTEM START SWARM MODE`.
diff --git a/docs/en/sql-reference/table-functions/azureBlobStorageCluster.md b/docs/en/sql-reference/table-functions/azureBlobStorageCluster.md
index 4db1dbb594c6..b67ea0efe2e2 100644
--- a/docs/en/sql-reference/table-functions/azureBlobStorageCluster.md
+++ b/docs/en/sql-reference/table-functions/azureBlobStorageCluster.md
@@ -54,6 +54,20 @@ SELECT count(*) FROM azureBlobStorageCluster(
See [azureBlobStorage](/sql-reference/table-functions/azureBlobStorage#using-shared-access-signatures-sas-sas-tokens) for examples.
+## Altinity Antalya branch
+
+### `object_storage_cluster` setting.
+
+Only in the Altinity Antalya branch, the alternative syntax for the `azureBlobStorageCluster` table function is avilable. This allows the `azureBlobStorage` function to be used with the non-empty `object_storage_cluster` setting, specifying a cluster name. This enables distributed queries over Azure Blob Storage across a ClickHouse cluster.
+
+```sql
+SELECT count(*) FROM azureBlobStorage(
+ 'http://azurite1:10000/devstoreaccount1', 'testcontainer', 'test_cluster_count.csv', 'devstoreaccount1',
+ 'Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==', 'CSV',
+ 'auto', 'key UInt64')
+SETTINGS object_storage_cluster='cluster_simple'
+```
+
## Related {#related}
- [AzureBlobStorage engine](../../engines/table-engines/integrations/azureBlobStorage.md)
diff --git a/docs/en/sql-reference/table-functions/deltalakeCluster.md b/docs/en/sql-reference/table-functions/deltalakeCluster.md
index f01b40d5ce6f..865399172ff3 100644
--- a/docs/en/sql-reference/table-functions/deltalakeCluster.md
+++ b/docs/en/sql-reference/table-functions/deltalakeCluster.md
@@ -45,6 +45,17 @@ A table with the specified structure for reading data from cluster in the specif
- `_time` — Last modified time of the file. Type: `Nullable(DateTime)`. If the time is unknown, the value is `NULL`.
- `_etag` — The etag of the file. Type: `LowCardinality(String)`. If the etag is unknown, the value is `NULL`.
+## Altinity Antalya branch
+
+### `object_storage_cluster` setting.
+
+Only in the Altinity Antalya branch alternative syntax for `deltaLakeCluster` table function is available. This allows the `deltaLake` function to be used with the non-empty `object_storage_cluster` setting, specifying a cluster name. This enables distributed queries over Delta Lake Storage across a ClickHouse cluster.
+
+```sql
+SELECT count(*) FROM deltaLake(url [,aws_access_key_id, aws_secret_access_key] [,format] [,structure] [,compression])
+SETTINGS object_storage_cluster='cluster_simple'
+```
+
## Related {#related}
- [deltaLake engine](engines/table-engines/integrations/deltalake.md)
diff --git a/docs/en/sql-reference/table-functions/hdfsCluster.md b/docs/en/sql-reference/table-functions/hdfsCluster.md
index 74a526a2de7b..bfc4ab30fd5b 100644
--- a/docs/en/sql-reference/table-functions/hdfsCluster.md
+++ b/docs/en/sql-reference/table-functions/hdfsCluster.md
@@ -60,6 +60,18 @@ FROM hdfsCluster('cluster_simple', 'hdfs://hdfs1:9000/{some,another}_dir/*', 'TS
If your listing of files contains number ranges with leading zeros, use the construction with braces for each digit separately or use `?`.
:::
+## Altinity Antalya branch
+
+### `object_storage_cluster` setting.
+
+Only in the Altinity Antalya branch alternative syntax for `hdfsCluster` table function is available. This allows the `hdfs` function to be used with the non-empty `object_storage_cluster` setting, specifying a cluster name. This enables distributed queries over HDFS Storage across a ClickHouse cluster.
+
+```sql
+SELECT count(*)
+FROM hdfs('hdfs://hdfs1:9000/{some,another}_dir/*', 'TSV', 'name String, value UInt32')
+SETTINGS object_storage_cluster='cluster_simple'
+```
+
## Related {#related}
- [HDFS engine](../../engines/table-engines/integrations/hdfs.md)
diff --git a/docs/en/sql-reference/table-functions/hudiCluster.md b/docs/en/sql-reference/table-functions/hudiCluster.md
index 3f44a369d062..1087ef51cb84 100644
--- a/docs/en/sql-reference/table-functions/hudiCluster.md
+++ b/docs/en/sql-reference/table-functions/hudiCluster.md
@@ -43,6 +43,18 @@ A table with the specified structure for reading data from cluster in the specif
- `_time` — Last modified time of the file. Type: `Nullable(DateTime)`. If the time is unknown, the value is `NULL`.
- `_etag` — The etag of the file. Type: `LowCardinality(String)`. If the etag is unknown, the value is `NULL`.
+## Altinity Antalya branch
+
+### `object_storage_cluster` setting.
+
+Only in the Altinity Antalya branch alternative syntax for `hudiCluster` table function is available. This allows the `hudi` function to be used with the non-empty `object_storage_cluster` setting, specifying a cluster name. This enables distributed queries over Hudi Storage across a ClickHouse cluster.
+
+```sql
+SELECT *
+FROM hudi(url [,aws_access_key_id, aws_secret_access_key] [,format] [,structure] [,compression])
+SETTINGS object_storage_cluster='cluster_simple'
+```
+
## Related {#related}
- [Hudi engine](engines/table-engines/integrations/hudi.md)
diff --git a/docs/en/sql-reference/table-functions/iceberg.md b/docs/en/sql-reference/table-functions/iceberg.md
index a4917c286e7e..c0c27b384429 100644
--- a/docs/en/sql-reference/table-functions/iceberg.md
+++ b/docs/en/sql-reference/table-functions/iceberg.md
@@ -649,6 +649,47 @@ GRANT ALTER TABLE ON my_iceberg_table TO my_user;
- The catalog's own authorization (REST catalog auth, AWS Glue IAM, etc.) is enforced independently when ClickHouse updates the metadata
:::
+## Altinity Antalya branch
+
+### Specify storage type in arguments
+
+Only in the Altinity Antalya branch does the `iceberg` table function support all storage types. The storage type can be specified using the named argument `storage_type`. Supported values are `s3`, `azure`, `hdfs`, and `local`.
+
+```sql
+iceberg(storage_type='s3', url [, NOSIGN | access_key_id, secret_access_key, [session_token]] [,format] [,compression_method])
+
+iceberg(storage_type='azure', connection_string|storage_account_url, container_name, blobpath, [,account_name], [,account_key] [,format] [,compression_method])
+
+iceberg(storage_type='hdfs', path_to_table, [,format] [,compression_method])
+
+iceberg(storage_type='local', path_to_table, [,format] [,compression_method])
+```
+
+### Specify storage type in named collection
+
+Only in the Altinity Antalya branch can storage_type be included as part of a named collection. This allows for centralized configuration of storage settings.
+
+```xml
+
+
+
+ http://test.s3.amazonaws.com/clickhouse-bucket/
+ test
+ test
+ auto
+ auto
+ s3
+
+
+
+```
+
+```sql
+iceberg(named_collection[, option=value [,..]])
+```
+
+The default value for `storage_type` is `s3`.
+
## See Also {#see-also}
* [Iceberg engine](/engines/table-engines/integrations/iceberg.md)
diff --git a/docs/en/sql-reference/table-functions/icebergCluster.md b/docs/en/sql-reference/table-functions/icebergCluster.md
index d3ce33579d3e..f1db4c4f44ac 100644
--- a/docs/en/sql-reference/table-functions/icebergCluster.md
+++ b/docs/en/sql-reference/table-functions/icebergCluster.md
@@ -50,6 +50,81 @@ SELECT * FROM icebergS3Cluster('cluster_simple', 'http://test.s3.amazonaws.com/c
- `_time` — Last modified time of the file. Type: `Nullable(DateTime)`. If the time is unknown, the value is `NULL`.
- `_etag` — The etag of the file. Type: `LowCardinality(String)`. If the etag is unknown, the value is `NULL`.
+## Altinity Antalya branch
+
+### `icebergLocalCluster` table function
+
+Only in the Altinity Antalya branch, `icebergLocalCluster` designed to make distributed cluster queries when Iceberg data is stored on shared network storage mounted with a local path. The path must be identical on all replicas.
+
+```sql
+icebergLocalCluster(cluster_name, path_to_table, [,format] [,compression_method])
+```
+
+### Specify storage type in function arguments
+
+Only in the Altinity Antalya branch, the `icebergCluster` table function supports all storage backends. The storage backend can be specified using the named argument `storage_type`. Valid values include `s3`, `azure`, `hdfs`, and `local`.
+
+```sql
+icebergCluster(storage_type='s3', cluster_name, url [, NOSIGN | access_key_id, secret_access_key, [session_token]] [,format] [,compression_method])
+
+icebergCluster(storage_type='azure', cluster_name, connection_string|storage_account_url, container_name, blobpath, [,account_name], [,account_key] [,format] [,compression_method])
+
+icebergCluster(storage_type='hdfs', cluster_name, path_to_table, [,format] [,compression_method])
+
+icebergCluster(storage_type='local', cluster_name, path_to_table, [,format] [,compression_method])
+```
+
+### Specify storage type in a named collection
+
+Only in the Altinity Antalya branch, `storage_type` can be part of a named collection.
+
+```xml
+
+
+
+ http://test.s3.amazonaws.com/clickhouse-bucket/
+ test
+ test
+ auto
+ auto
+ s3
+
+
+
+```
+
+```sql
+icebergCluster(iceberg_conf[, option=value [,..]])
+```
+
+The default value for `storage_type` is `s3`.
+
+### `object_storage_cluster` setting.
+
+Only in the Altinity Antalya branch, an alternative syntax for `icebergCluster` table function is available. This allows the `iceberg` function to be used with the non-empty `object_storage_cluster` setting, specifying a cluster name. This enables distributed queries over Iceberg table across a ClickHouse cluster.
+
+```sql
+icebergS3(url [, NOSIGN | access_key_id, secret_access_key, [session_token]] [,format] [,compression_method]) SETTINGS object_storage_cluster='cluster_name'
+
+icebergAzure(connection_string|storage_account_url, container_name, blobpath, [,account_name], [,account_key] [,format] [,compression_method]) SETTINGS object_storage_cluster='cluster_name'
+
+icebergHDFS(path_to_table, [,format] [,compression_method]) SETTINGS object_storage_cluster='cluster_name'
+
+icebergLocal(path_to_table, [,format] [,compression_method]) SETTINGS object_storage_cluster='cluster_name'
+
+icebergS3(option=value [,..]) SETTINGS object_storage_cluster='cluster_name'
+
+iceberg(storage_type='s3', url [, NOSIGN | access_key_id, secret_access_key, [session_token]] [,format] [,compression_method]) SETTINGS object_storage_cluster='cluster_name'
+
+iceberg(storage_type='azure', connection_string|storage_account_url, container_name, blobpath, [,account_name], [,account_key] [,format] [,compression_method]) SETTINGS object_storage_cluster='cluster_name'
+
+iceberg(storage_type='hdfs', path_to_table, [,format] [,compression_method]) SETTINGS object_storage_cluster='cluster_name'
+
+iceberg(storage_type='local', path_to_table, [,format] [,compression_method]) SETTINGS object_storage_cluster='cluster_name'
+
+iceberg(iceberg_conf[, option=value [,..]]) SETTINGS object_storage_cluster='cluster_name'
+```
+
**See Also**
- [Iceberg engine](/engines/table-engines/integrations/iceberg.md)
diff --git a/docs/en/sql-reference/table-functions/s3Cluster.md b/docs/en/sql-reference/table-functions/s3Cluster.md
index 2e6af0273ba0..f0cce77a0b49 100644
--- a/docs/en/sql-reference/table-functions/s3Cluster.md
+++ b/docs/en/sql-reference/table-functions/s3Cluster.md
@@ -91,6 +91,23 @@ Users can use the same approaches as document for the s3 function [here](/sql-re
For details on optimizing the performance of the s3 function see [our detailed guide](/integrations/s3/performance).
+## Altinity Antalya branch
+
+### `object_storage_cluster` setting.
+
+Only in the Altinity Antalya branch alternative syntax for `s3Cluster` table function is available. This allows the `s3` function to be used with the non-empty `object_storage_cluster` setting, specifying a cluster name. This enables distributed queries over S3 Storage across a ClickHouse cluster.
+
+```sql
+SELECT * FROM s3(
+ 'http://minio1:9001/root/data/{clickhouse,database}/*',
+ 'minio',
+ 'ClickHouse_Minio_P@ssw0rd',
+ 'CSV',
+ 'name String, value UInt32, polygon Array(Array(Tuple(Float64, Float64)))'
+) ORDER BY (name, value, polygon)
+SETTINGS object_storage_cluster='cluster_simple'
+```
+
## Related {#related}
- [S3 engine](../../engines/table-engines/integrations/s3.md)
diff --git a/src/Analyzer/FunctionNode.cpp b/src/Analyzer/FunctionNode.cpp
index 306f64db3bae..52ef493021fd 100644
--- a/src/Analyzer/FunctionNode.cpp
+++ b/src/Analyzer/FunctionNode.cpp
@@ -12,6 +12,7 @@
#include
#include
+#include
#include
@@ -164,6 +165,13 @@ void FunctionNode::dumpTreeImpl(WriteBuffer & buffer, FormatState & format_state
buffer << '\n' << std::string(indent + 2, ' ') << "WINDOW\n";
getWindowNode()->dumpTreeImpl(buffer, format_state, indent + 4);
}
+
+ if (!settings_changes.empty())
+ {
+ buffer << '\n' << std::string(indent + 2, ' ') << "SETTINGS";
+ for (const auto & change : settings_changes)
+ buffer << fmt::format(" {}={}", change.name, fieldToString(change.value));
+ }
}
bool FunctionNode::isEqualImpl(const IQueryTreeNode & rhs, CompareOptions compare_options) const
@@ -171,7 +179,7 @@ bool FunctionNode::isEqualImpl(const IQueryTreeNode & rhs, CompareOptions compar
const auto & rhs_typed = assert_cast(rhs);
if (function_name != rhs_typed.function_name || isAggregateFunction() != rhs_typed.isAggregateFunction()
|| isOrdinaryFunction() != rhs_typed.isOrdinaryFunction() || isWindowFunction() != rhs_typed.isWindowFunction()
- || nulls_action != rhs_typed.nulls_action)
+ || nulls_action != rhs_typed.nulls_action || settings_changes != rhs_typed.settings_changes)
return false;
/// is_operator is ignored here because it affects only AST formatting
@@ -206,6 +214,17 @@ void FunctionNode::updateTreeHashImpl(HashState & hash_state, CompareOptions com
hash_state.update(isWindowFunction());
hash_state.update(nulls_action);
+ hash_state.update(settings_changes.size());
+ for (const auto & change : settings_changes)
+ {
+ hash_state.update(change.name.size());
+ hash_state.update(change.name);
+
+ const auto & value_dump = change.value.dump();
+ hash_state.update(value_dump.size());
+ hash_state.update(value_dump);
+ }
+
/// is_operator is ignored here because it affects only AST formatting
if (!compare_options.compare_types)
@@ -230,6 +249,7 @@ QueryTreeNodePtr FunctionNode::cloneImpl() const
result_function->nulls_action = nulls_action;
result_function->wrap_with_nullable = wrap_with_nullable;
result_function->is_operator = is_operator;
+ result_function->settings_changes = settings_changes;
return result_function;
}
@@ -292,6 +312,14 @@ ASTPtr FunctionNode::toASTImpl(const ConvertToASTOptions & options) const
function_ast->window_definition = window_node->toAST(new_options);
}
+ if (!settings_changes.empty())
+ {
+ auto settings_ast = make_intrusive();
+ settings_ast->changes = settings_changes;
+ settings_ast->is_standalone = false;
+ function_ast->arguments->children.push_back(settings_ast);
+ }
+
return function_ast;
}
diff --git a/src/Analyzer/FunctionNode.h b/src/Analyzer/FunctionNode.h
index c0005016def6..0ec99c9ab40c 100644
--- a/src/Analyzer/FunctionNode.h
+++ b/src/Analyzer/FunctionNode.h
@@ -10,6 +10,7 @@
#include
#include
#include
+#include
namespace DB
{
@@ -204,6 +205,18 @@ class FunctionNode final : public IQueryTreeNode
wrap_with_nullable = true;
}
+ /// Get settings changes passed to table function
+ const SettingsChanges & getSettingsChanges() const
+ {
+ return settings_changes;
+ }
+
+ /// Set settings changes passed as last argument to table function
+ void setSettingsChanges(SettingsChanges settings_changes_)
+ {
+ settings_changes = std::move(settings_changes_);
+ }
+
void dumpTreeImpl(WriteBuffer & buffer, FormatState & format_state, size_t indent) const override;
protected:
@@ -228,6 +241,8 @@ class FunctionNode final : public IQueryTreeNode
static constexpr size_t arguments_child_index = 1;
static constexpr size_t window_child_index = 2;
static constexpr size_t children_size = window_child_index + 1;
+
+ SettingsChanges settings_changes;
};
}
diff --git a/src/Analyzer/FunctionSecretArgumentsFinderTreeNode.h b/src/Analyzer/FunctionSecretArgumentsFinderTreeNode.h
index 8bcb6e147420..e4f63192c95b 100644
--- a/src/Analyzer/FunctionSecretArgumentsFinderTreeNode.h
+++ b/src/Analyzer/FunctionSecretArgumentsFinderTreeNode.h
@@ -71,8 +71,14 @@ class FunctionTreeNodeImpl : public AbstractFunction
{
public:
explicit ArgumentsTreeNode(const QueryTreeNodes * arguments_) : arguments(arguments_) {}
- size_t size() const override { return arguments ? arguments->size() : 0; }
- std::unique_ptr at(size_t n) const override { return std::make_unique(arguments->at(n).get()); }
+ size_t size() const override
+ { /// size withous skipped indexes
+ return arguments ? arguments->size() - skippedSize() : 0;
+ }
+ std::unique_ptr at(size_t n) const override
+ { /// n is relative index, some can be skipped
+ return std::make_unique(arguments->at(getRealIndex(n)).get());
+ }
private:
const QueryTreeNodes * arguments = nullptr;
};
diff --git a/src/Analyzer/QueryTreeBuilder.cpp b/src/Analyzer/QueryTreeBuilder.cpp
index d11fecf3011a..37c0d0472429 100644
--- a/src/Analyzer/QueryTreeBuilder.cpp
+++ b/src/Analyzer/QueryTreeBuilder.cpp
@@ -762,7 +762,12 @@ QueryTreeNodePtr QueryTreeBuilder::buildExpression(const ASTPtr & expression, co
{
const auto & function_arguments_list = function->arguments->as()->children;
for (const auto & argument : function_arguments_list)
- function_node->getArguments().getNodes().push_back(buildExpression(argument, context));
+ {
+ if (const auto * ast_set = argument->as())
+ function_node->setSettingsChanges(ast_set->changes);
+ else
+ function_node->getArguments().getNodes().push_back(buildExpression(argument, context));
+ }
}
if (function->isWindowFunction())
diff --git a/src/Analyzer/Resolve/QueryAnalyzer.cpp b/src/Analyzer/Resolve/QueryAnalyzer.cpp
index f55197b602ef..38ad97fc0294 100644
--- a/src/Analyzer/Resolve/QueryAnalyzer.cpp
+++ b/src/Analyzer/Resolve/QueryAnalyzer.cpp
@@ -4011,6 +4011,7 @@ void QueryAnalyzer::resolveTableFunction(QueryTreeNodePtr & table_function_node,
{
auto table_function_node_to_resolve_typed = std::make_shared(table_function_argument_function_name);
table_function_node_to_resolve_typed->getArgumentsNode() = table_function_argument_function->getArgumentsNode();
+ table_function_node_to_resolve_typed->setSettingsChanges(table_function_argument_function->getSettingsChanges());
QueryTreeNodePtr table_function_node_to_resolve = std::move(table_function_node_to_resolve_typed);
if (table_function_argument_function_name == "view")
diff --git a/src/Common/ProfileEvents.cpp b/src/Common/ProfileEvents.cpp
index 9872e039abfa..147c6610146f 100644
--- a/src/Common/ProfileEvents.cpp
+++ b/src/Common/ProfileEvents.cpp
@@ -360,6 +360,11 @@
M(IcebergTrivialCountOptimizationApplied, "Trivial count optimization applied while reading from Iceberg", ValueType::Number) \
M(IcebergVersionHintUsed, "Number of times version-hint.text has been used.", ValueType::Number) \
M(IcebergMinMaxIndexPrunedFiles, "Number of skipped files by using MinMax index in Iceberg", ValueType::Number) \
+ M(IcebergAvroFileParsing, "Number of times avro metadata files have been parsed.", ValueType::Number) \
+ M(IcebergAvroFileParsingMicroseconds, "Time spent for parsing avro metadata files for Iceberg tables.", ValueType::Microseconds) \
+ M(IcebergJsonFileParsing, "Number of times json metadata files have been parsed.", ValueType::Number) \
+ M(IcebergJsonFileParsingMicroseconds, "Time spent for parsing json metadata files for Iceberg tables.", ValueType::Microseconds) \
+ \
M(JoinBuildTableRowCount, "Total number of rows in the build table for a JOIN operation.", ValueType::Number) \
M(JoinProbeTableRowCount, "Total number of rows in the probe table for a JOIN operation.", ValueType::Number) \
M(JoinResultRowCount, "Total number of rows in the result of a JOIN operation.", ValueType::Number) \
@@ -688,8 +693,10 @@ The server successfully detected this situation and will download merged part fr
M(S3DeleteObjects, "Number of S3 API DeleteObject(s) calls.", ValueType::Number) \
M(S3CopyObject, "Number of S3 API CopyObject calls.", ValueType::Number) \
M(S3ListObjects, "Number of S3 API ListObjects calls.", ValueType::Number) \
+ M(S3ListObjectsMicroseconds, "Time of S3 API ListObjects execution.", ValueType::Microseconds) \
M(S3HeadObject, "Number of S3 API HeadObject calls.", ValueType::Number) \
M(S3GetObjectTagging, "Number of S3 API GetObjectTagging calls.", ValueType::Number) \
+ M(S3HeadObjectMicroseconds, "Time of S3 API HeadObject execution.", ValueType::Microseconds) \
M(S3CreateMultipartUpload, "Number of S3 API CreateMultipartUpload calls.", ValueType::Number) \
M(S3UploadPartCopy, "Number of S3 API UploadPartCopy calls.", ValueType::Number) \
M(S3UploadPart, "Number of S3 API UploadPart calls.", ValueType::Number) \
@@ -744,6 +751,7 @@ The server successfully detected this situation and will download merged part fr
M(AzureCopyObject, "Number of Azure blob storage API CopyObject calls", ValueType::Number) \
M(AzureDeleteObjects, "Number of Azure blob storage API DeleteObject(s) calls.", ValueType::Number) \
M(AzureListObjects, "Number of Azure blob storage API ListObjects calls.", ValueType::Number) \
+ M(AzureListObjectsMicroseconds, "Time of Azure blob storage API ListObjects execution.", ValueType::Microseconds) \
M(AzureGetProperties, "Number of Azure blob storage API GetProperties calls.", ValueType::Number) \
M(AzureCreateContainer, "Number of Azure blob storage API CreateContainer calls.", ValueType::Number) \
\
diff --git a/src/Core/Settings.cpp b/src/Core/Settings.cpp
index 0cbb8921adc8..acd8b34b07ce 100644
--- a/src/Core/Settings.cpp
+++ b/src/Core/Settings.cpp
@@ -7480,6 +7480,25 @@ Always ignore ON CLUSTER clause for DDL queries with replicated databases.
)", 0) \
DECLARE(UInt64, archive_adaptive_buffer_max_size_bytes, 8 * DBMS_DEFAULT_BUFFER_SIZE, R"(
Limits the maximum size of the adaptive buffer used when writing to archive files (for example, tar archives)", 0) \
+ DECLARE(Timezone, iceberg_timezone_for_timestamptz, "UTC", R"(
+Timezone for Iceberg timestamptz field.
+
+Possible values:
+
+- Any valid timezone, e.g. `Europe/Berlin`, `UTC` or `Zulu`
+- `` (empty value) - use session timezone
+
+Default value is `UTC`.
+)", 0) \
+ DECLARE(Timezone, iceberg_partition_timezone, "", R"(
+Time zone by which partitioning of Iceberg tables was performed.
+Possible values:
+
+- Any valid timezone, e.g. `Europe/Berlin`, `UTC` or `Zulu`
+- `` (empty value) - use server or session timezone
+
+Default value is empty.
+)", 0) \
\
/* ####################################################### */ \
/* ########### START OF EXPERIMENTAL FEATURES ############ */ \
@@ -7631,6 +7650,15 @@ Source SQL dialect for the polyglot transpiler (e.g. 'sqlite', 'mysql', 'postgre
)", EXPERIMENTAL) \
DECLARE(Bool, enable_adaptive_memory_spill_scheduler, false, R"(
Trigger processor to spill data into external storage adpatively. grace join is supported at present.
+)", EXPERIMENTAL) \
+ DECLARE(String, object_storage_cluster, "", R"(
+Cluster to make distributed requests to object storages with alternative syntax.
+)", EXPERIMENTAL) \
+ DECLARE(UInt64, object_storage_max_nodes, 0, R"(
+Limit for hosts used for request in object storage cluster table functions - azureBlobStorageCluster, s3Cluster, hdfsCluster, etc.
+Possible values:
+- Positive integer.
+- 0 — All hosts in cluster.
)", EXPERIMENTAL) \
DECLARE(Bool, allow_experimental_delta_kernel_rs, true, R"(
Allow experimental delta-kernel-rs implementation.
@@ -7712,6 +7740,12 @@ If the number of set bits in a runtime bloom filter exceeds this ratio the filte
)", EXPERIMENTAL) \
DECLARE(Bool, rewrite_in_to_join, false, R"(
Rewrite expressions like 'x IN subquery' to JOIN. This might be useful for optimizing the whole query with join reordering.
+)", EXPERIMENTAL) \
+ DECLARE(Bool, object_storage_remote_initiator, false, R"(
+Execute request to object storage as remote on one of object_storage_cluster nodes.
+)", EXPERIMENTAL) \
+ DECLARE(String, object_storage_remote_initiator_cluster, "", R"(
+Cluster to choose remote initiator, when `object_storage_remote_initiator` is true. When empty, `object_storage_cluster` is used.
)", EXPERIMENTAL) \
\
/** Experimental timeSeries* aggregate functions. */ \
diff --git a/src/Core/SettingsChangesHistory.cpp b/src/Core/SettingsChangesHistory.cpp
index ac6d42e957f6..dac022fdee3a 100644
--- a/src/Core/SettingsChangesHistory.cpp
+++ b/src/Core/SettingsChangesHistory.cpp
@@ -39,6 +39,9 @@ const VersionToSettingsChangesMap & getSettingsChangesHistory()
/// controls new feature and it's 'true' by default, use 'false' as previous_value).
/// It's used to implement `compatibility` setting (see https://github.com/ClickHouse/ClickHouse/issues/35972)
/// Note: please check if the key already exists to prevent duplicate entries.
+ addSettingsChanges(settings_changes_history, "26.3.1.20001.altinityantalya",
+ {
+ });
addSettingsChanges(settings_changes_history, "26.3",
{
{"allow_experimental_polyglot_dialect", false, false, "New setting to enable the polyglot SQL transpiler dialect."},
@@ -108,12 +111,12 @@ const VersionToSettingsChangesMap & getSettingsChangesHistory()
});
addSettingsChanges(settings_changes_history, "26.1.3.20001.altinityantalya",
{
- // {"iceberg_partition_timezone", "", "", "New setting."},
+ {"iceberg_partition_timezone", "", "", "New setting."},
// {"s3_propagate_credentials_to_other_storages", false, false, "New setting"},
// {"export_merge_tree_part_filename_pattern", "", "{part_name}_{checksum}", "New setting"},
// {"use_parquet_metadata_cache", false, true, "Enables cache of parquet file metadata."},
// {"input_format_parquet_use_metadata_cache", true, false, "Obsolete. No-op"}, // https://github.com/Altinity/ClickHouse/pull/586
- // {"object_storage_remote_initiator_cluster", "", "", "New setting."},
+ {"object_storage_remote_initiator_cluster", "", "", "New setting."},
// {"iceberg_metadata_staleness_ms", 0, 0, "New setting allowing using cached metadata version at READ operations to prevent fetching from remote catalog"},
});
addSettingsChanges(settings_changes_history, "26.1",
@@ -200,7 +203,6 @@ const VersionToSettingsChangesMap & getSettingsChangesHistory()
{"insert_select_deduplicate", Field{"auto"}, Field{"auto"}, "New setting"},
{"output_format_pretty_named_tuples_as_json", false, true, "New setting to control whether named tuples in Pretty format are output as JSON objects"},
{"deduplicate_insert_select", "enable_even_for_bad_queries", "enable_even_for_bad_queries", "New setting, replace insert_select_deduplicate"},
-
});
addSettingsChanges(settings_changes_history, "25.11",
{
@@ -299,15 +301,15 @@ const VersionToSettingsChangesMap & getSettingsChangesHistory()
});
addSettingsChanges(settings_changes_history, "25.8.16.20001.altinityantalya",
{
- // {"allow_experimental_database_iceberg", false, true, "Turned ON by default for Antalya."},
- // {"allow_experimental_database_unity_catalog", false, true, "Turned ON by default for Antalya."},
- // {"allow_experimental_database_glue_catalog", false, true, "Turned ON by default for Antalya."},
- // {"allow_database_iceberg", false, true, "Turned ON by default for Antalya (alias)."},
- // {"allow_database_unity_catalog", false, true, "Turned ON by default for Antalya (alias)."},
- // {"allow_database_glue_catalog", false, true, "Turned ON by default for Antalya (alias)."},
+ {"allow_experimental_database_iceberg", false, true, "Turned ON by default for Antalya."},
+ {"allow_experimental_database_unity_catalog", false, true, "Turned ON by default for Antalya."},
+ {"allow_experimental_database_glue_catalog", false, true, "Turned ON by default for Antalya."},
+ {"allow_database_iceberg", false, true, "Turned ON by default for Antalya (alias)."},
+ {"allow_database_unity_catalog", false, true, "Turned ON by default for Antalya (alias)."},
+ {"allow_database_glue_catalog", false, true, "Turned ON by default for Antalya (alias)."},
// {"input_format_parquet_use_metadata_cache", true, true, "New setting, turned ON by default"}, // https://github.com/Altinity/ClickHouse/pull/586
- // {"iceberg_timezone_for_timestamptz", "UTC", "UTC", "New setting."},
- // {"object_storage_remote_initiator", false, false, "New setting."},
+ {"iceberg_timezone_for_timestamptz", "UTC", "UTC", "New setting."},
+ {"object_storage_remote_initiator", false, false, "New setting."},
// {"allow_experimental_iceberg_read_optimization", true, true, "New setting."},
// {"object_storage_cluster_join_mode", "allow", "allow", "New setting"},
// {"lock_object_storage_task_distribution_ms", 500, 500, "New setting."},
@@ -327,8 +329,8 @@ const VersionToSettingsChangesMap & getSettingsChangesHistory()
// {"export_merge_tree_partition_system_table_prefer_remote_information", true, true, "New setting."},
// {"export_merge_tree_part_throw_on_pending_mutations", true, true, "New setting."},
// {"export_merge_tree_part_throw_on_pending_patch_parts", true, true, "New setting."},
- // {"object_storage_cluster", "", "", "Antalya: New setting"},
- // {"object_storage_max_nodes", 0, 0, "Antalya: New setting"},
+ {"object_storage_cluster", "", "", "Antalya: New setting"},
+ {"object_storage_max_nodes", 0, 0, "Antalya: New setting"},
});
addSettingsChanges(settings_changes_history, "25.8",
{
diff --git a/src/Databases/DataLake/Common.cpp b/src/Databases/DataLake/Common.cpp
index 681dd957b43f..8946d3412d70 100644
--- a/src/Databases/DataLake/Common.cpp
+++ b/src/Databases/DataLake/Common.cpp
@@ -61,14 +61,14 @@ std::vector splitTypeArguments(const String & type_str)
return args;
}
-DB::DataTypePtr getType(const String & type_name, bool nullable, const String & prefix)
+DB::DataTypePtr getType(const String & type_name, bool nullable, DB::ContextPtr context, const String & prefix)
{
String name = trim(type_name);
if (name.starts_with("array<") && name.ends_with(">"))
{
String inner = name.substr(6, name.size() - 7);
- return std::make_shared(getType(inner, nullable));
+ return std::make_shared(getType(inner, nullable, context));
}
if (name.starts_with("map<") && name.ends_with(">"))
@@ -79,7 +79,7 @@ DB::DataTypePtr getType(const String & type_name, bool nullable, const String &
if (args.size() != 2)
throw DB::Exception(DB::ErrorCodes::DATALAKE_DATABASE_ERROR, "Invalid data type {}", type_name);
- return std::make_shared(getType(args[0], false), getType(args[1], nullable));
+ return std::make_shared(getType(args[0], false, context), getType(args[1], nullable, context));
}
if (name.starts_with("struct<") && name.ends_with(">"))
@@ -101,13 +101,13 @@ DB::DataTypePtr getType(const String & type_name, bool nullable, const String &
String full_field_name = prefix.empty() ? field_name : prefix + "." + field_name;
field_names.push_back(full_field_name);
- field_types.push_back(getType(field_type, nullable, full_field_name));
+ field_types.push_back(getType(field_type, nullable, context, full_field_name));
}
return std::make_shared(field_types, field_names);
}
- return nullable ? DB::makeNullable(DB::Iceberg::IcebergSchemaProcessor::getSimpleType(name))
- : DB::Iceberg::IcebergSchemaProcessor::getSimpleType(name);
+ return nullable ? DB::makeNullable(DB::Iceberg::IcebergSchemaProcessor::getSimpleType(name, context))
+ : DB::Iceberg::IcebergSchemaProcessor::getSimpleType(name, context);
}
std::pair parseTableName(const std::string & name)
diff --git a/src/Databases/DataLake/Common.h b/src/Databases/DataLake/Common.h
index cd4b6214e343..9b0dd7c626a6 100644
--- a/src/Databases/DataLake/Common.h
+++ b/src/Databases/DataLake/Common.h
@@ -2,6 +2,7 @@
#include
#include
+#include
namespace DataLake
{
@@ -10,7 +11,7 @@ String trim(const String & str);
std::vector splitTypeArguments(const String & type_str);
-DB::DataTypePtr getType(const String & type_name, bool nullable, const String & prefix = "");
+DB::DataTypePtr getType(const String & type_name, bool nullable, DB::ContextPtr context, const String & prefix = "");
/// Parse a string, containing at least one dot, into a two substrings:
/// A.B.C.D.E -> A.B.C.D and E, where
diff --git a/src/Databases/DataLake/DataLakeConstants.h b/src/Databases/DataLake/DataLakeConstants.h
index 0b228bf310ec..372cc92a6631 100644
--- a/src/Databases/DataLake/DataLakeConstants.h
+++ b/src/Databases/DataLake/DataLakeConstants.h
@@ -8,6 +8,7 @@ namespace DataLake
{
static constexpr auto DATABASE_ENGINE_NAME = "DataLakeCatalog";
+static constexpr auto DATABASE_ALIAS_NAME = "Iceberg";
static constexpr std::string_view FILE_PATH_PREFIX = "file:/";
/// Some catalogs (Unity or Glue) may store not only Iceberg/DeltaLake tables but other kinds of "tables"
diff --git a/src/Databases/DataLake/DatabaseDataLake.cpp b/src/Databases/DataLake/DatabaseDataLake.cpp
index d140a03d6fa0..127eb51cfcb3 100644
--- a/src/Databases/DataLake/DatabaseDataLake.cpp
+++ b/src/Databases/DataLake/DatabaseDataLake.cpp
@@ -61,6 +61,7 @@ namespace DatabaseDataLakeSetting
extern const DatabaseDataLakeSettingsString oauth_server_uri;
extern const DatabaseDataLakeSettingsBool oauth_server_use_request_body;
extern const DatabaseDataLakeSettingsBool vended_credentials;
+ extern const DatabaseDataLakeSettingsString object_storage_cluster;
extern const DatabaseDataLakeSettingsString aws_access_key_id;
extern const DatabaseDataLakeSettingsString aws_secret_access_key;
extern const DatabaseDataLakeSettingsString region;
@@ -295,7 +296,7 @@ std::shared_ptr DatabaseDataLake::getCatalog() const
return catalog_impl;
}
-std::shared_ptr DatabaseDataLake::getConfiguration(
+StorageObjectStorageConfigurationPtr DatabaseDataLake::getConfiguration(
DatabaseDataLakeStorageType type,
DataLakeStorageSettingsPtr storage_settings) const
{
@@ -515,7 +516,7 @@ StoragePtr DatabaseDataLake::tryGetTableImpl(const String & name, ContextPtr con
auto [namespace_name, table_name] = DataLake::parseTableName(name);
- if (!catalog->tryGetTableMetadata(namespace_name, table_name, table_metadata))
+ if (!catalog->tryGetTableMetadata(namespace_name, table_name, context_, table_metadata))
return nullptr;
if (ignore_if_not_iceberg && !table_metadata.isDefaultReadableTable())
return nullptr;
@@ -650,7 +651,7 @@ StoragePtr DatabaseDataLake::tryGetTableImpl(const String & name, ContextPtr con
/// with_table_structure = false: because there will be
/// no table structure in table definition AST.
- StorageObjectStorageConfiguration::initialize(*configuration, args, context_copy, /* with_table_structure */false);
+ configuration->initialize(args, context_copy, /* with_table_structure */false);
const auto & query_settings = context_->getSettingsRef();
@@ -662,50 +663,34 @@ StoragePtr DatabaseDataLake::tryGetTableImpl(const String & name, ContextPtr con
const auto is_secondary_query = context_->getClientInfo().query_kind == ClientInfo::QueryKind::SECONDARY_QUERY;
- if (can_use_parallel_replicas && !is_secondary_query)
- {
- auto storage_id = StorageID(getDatabaseName(), name);
- auto storage_cluster = std::make_shared(
- parallel_replicas_cluster_name,
- configuration,
- configuration->createObjectStorage(context_copy, /* is_readonly */ false, catalog->getCredentialsConfigurationCallback(storage_id)),
- storage_id,
- columns,
- ConstraintsDescription{},
- nullptr,
- context_,
- /// Use is_table_function = true,
- /// because this table is actually stateless like a table function.
- /* is_table_function */true);
-
- storage_cluster->startup();
- return storage_cluster;
- }
+ std::string cluster_name = configuration->isClusterSupported() ? settings[DatabaseDataLakeSetting::object_storage_cluster].value : "";
- bool can_use_distributed_iterator =
- context_->getClientInfo().collaborate_with_initiator &&
- can_use_parallel_replicas;
+ if (cluster_name.empty() && can_use_parallel_replicas && !is_secondary_query)
+ cluster_name = parallel_replicas_cluster_name;
- return std::make_shared(
+ auto storage_cluster = std::make_shared(
+ cluster_name,
configuration,
configuration->createObjectStorage(context_copy, /* is_readonly */ false, catalog->getCredentialsConfigurationCallback(StorageID(getDatabaseName(), name))),
- context_copy,
StorageID(getDatabaseName(), name),
/* columns */columns,
/* constraints */ConstraintsDescription{},
- /* comment */"",
+ /* partition_by */nullptr,
+ /* order_by */nullptr,
+ context_copy,
+ /* comment */ "",
getFormatSettings(context_copy),
LoadingStrictnessLevel::CREATE,
getCatalog(),
/* if_not_exists*/true,
/* is_datalake_query*/true,
- /* distributed_processing */can_use_distributed_iterator,
- /* partition_by */nullptr,
- /* order_by */nullptr,
/// Use is_table_function = true,
/// because this table is actually stateless like a table function.
/* is_table_function */true,
/* lazy_init */true);
+
+ storage_cluster->startup();
+ return storage_cluster;
}
void DatabaseDataLake::dropTable( /// NOLINT
@@ -854,7 +839,7 @@ ASTPtr DatabaseDataLake::getCreateDatabaseQueryImpl() const
ASTPtr DatabaseDataLake::getCreateTableQueryImpl(
const String & name,
- ContextPtr /* context_ */,
+ ContextPtr context_,
bool throw_on_error) const
{
auto catalog = getCatalog();
@@ -862,7 +847,7 @@ ASTPtr DatabaseDataLake::getCreateTableQueryImpl(
const auto [namespace_name, table_name] = DataLake::parseTableName(name);
- if (!catalog->tryGetTableMetadata(namespace_name, table_name, table_metadata))
+ if (!catalog->tryGetTableMetadata(namespace_name, table_name, context_, table_metadata))
{
if (throw_on_error)
throw Exception(ErrorCodes::CANNOT_GET_CREATE_TABLE_QUERY, "Table `{}` doesn't exist", name);
@@ -955,6 +940,11 @@ void registerDatabaseDataLake(DatabaseFactory & factory)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Engine `{}` must have arguments", database_engine_name);
}
+ if (database_engine_name == "Iceberg" && catalog_type != DatabaseDataLakeCatalogType::ICEBERG_REST)
+ {
+ throw Exception(ErrorCodes::BAD_ARGUMENTS, "Engine `Iceberg` must have `rest` catalog type only");
+ }
+
for (auto & engine_arg : engine_args)
engine_arg = evaluateConstantExpressionOrIdentifierAsLiteral(engine_arg, args.context);
@@ -1051,6 +1041,7 @@ void registerDatabaseDataLake(DatabaseFactory & factory)
args.uuid);
};
factory.registerDatabase("DataLakeCatalog", create_fn, { .supports_arguments = true, .supports_settings = true });
+ factory.registerDatabase("Iceberg", create_fn, { .supports_arguments = true, .supports_settings = true });
}
}
diff --git a/src/Databases/DataLake/DatabaseDataLake.h b/src/Databases/DataLake/DatabaseDataLake.h
index 4dc72224eb9c..efaf9f9db316 100644
--- a/src/Databases/DataLake/DatabaseDataLake.h
+++ b/src/Databases/DataLake/DatabaseDataLake.h
@@ -84,7 +84,7 @@ class DatabaseDataLake final : public IDatabase, WithContext
void validateSettings();
std::shared_ptr getCatalog() const;
- std::shared_ptr getConfiguration(
+ StorageObjectStorageConfigurationPtr getConfiguration(
DatabaseDataLakeStorageType type,
DataLakeStorageSettingsPtr storage_settings) const;
diff --git a/src/Databases/DataLake/GlueCatalog.cpp b/src/Databases/DataLake/GlueCatalog.cpp
index 426ea0f688ab..50c23dbe8314 100644
--- a/src/Databases/DataLake/GlueCatalog.cpp
+++ b/src/Databases/DataLake/GlueCatalog.cpp
@@ -283,6 +283,7 @@ bool GlueCatalog::existsTable(const std::string & database_name, const std::stri
bool GlueCatalog::tryGetTableMetadata(
const std::string & database_name,
const std::string & table_name,
+ DB::ContextPtr /* context_ */,
TableMetadata & result) const
{
Aws::Glue::Model::GetTableRequest request;
@@ -376,7 +377,7 @@ bool GlueCatalog::tryGetTableMetadata(
column_type = "timestamptz";
}
- schema.push_back({column.GetName(), getType(column_type, can_be_nullable)});
+ schema.push_back({column.GetName(), getType(column_type, can_be_nullable, getContext())});
}
result.setSchema(schema);
}
@@ -398,9 +399,10 @@ bool GlueCatalog::tryGetTableMetadata(
void GlueCatalog::getTableMetadata(
const std::string & database_name,
const std::string & table_name,
+ DB::ContextPtr context_,
TableMetadata & result) const
{
- if (!tryGetTableMetadata(database_name, table_name, result))
+ if (!tryGetTableMetadata(database_name, table_name, context_, result))
{
throw DB::Exception(
DB::ErrorCodes::DATALAKE_DATABASE_ERROR,
@@ -509,7 +511,7 @@ GlueCatalog::ObjectStorageWithPath GlueCatalog::createObjectStorageForEarlyTable
auto storage_settings = std::make_shared();
storage_settings->loadFromSettingsChanges(settings.allChanged());
auto configuration = std::make_shared(storage_settings);
- DB::StorageObjectStorageConfiguration::initialize(*configuration, args, getContext(), false);
+ configuration->initialize(args, getContext(), false);
auto object_storage = configuration->createObjectStorage(getContext(), true, {});
diff --git a/src/Databases/DataLake/GlueCatalog.h b/src/Databases/DataLake/GlueCatalog.h
index 34c01ffc1da2..8d92650b2d98 100644
--- a/src/Databases/DataLake/GlueCatalog.h
+++ b/src/Databases/DataLake/GlueCatalog.h
@@ -46,11 +46,13 @@ class GlueCatalog final : public ICatalog, private DB::WithContext
void getTableMetadata(
const std::string & database_name,
const std::string & table_name,
+ DB::ContextPtr context_,
TableMetadata & result) const override;
bool tryGetTableMetadata(
const std::string & database_name,
const std::string & table_name,
+ DB::ContextPtr context_,
TableMetadata & result) const override;
std::optional getStorageType() const override
diff --git a/src/Databases/DataLake/HiveCatalog.cpp b/src/Databases/DataLake/HiveCatalog.cpp
index b86f70dfc4b5..bc6ff5244749 100644
--- a/src/Databases/DataLake/HiveCatalog.cpp
+++ b/src/Databases/DataLake/HiveCatalog.cpp
@@ -121,13 +121,21 @@ bool HiveCatalog::existsTable(const std::string & namespace_name, const std::str
return true;
}
-void HiveCatalog::getTableMetadata(const std::string & namespace_name, const std::string & table_name, TableMetadata & result) const
+void HiveCatalog::getTableMetadata(
+ const std::string & namespace_name,
+ const std::string & table_name,
+ DB::ContextPtr context_,
+ TableMetadata & result) const
{
- if (!tryGetTableMetadata(namespace_name, table_name, result))
+ if (!tryGetTableMetadata(namespace_name, table_name, context_, result))
throw DB::Exception(DB::ErrorCodes::DATALAKE_DATABASE_ERROR, "No response from iceberg catalog");
}
-bool HiveCatalog::tryGetTableMetadata(const std::string & namespace_name, const std::string & table_name, TableMetadata & result) const
+bool HiveCatalog::tryGetTableMetadata(
+ const std::string & namespace_name,
+ const std::string & table_name,
+ DB::ContextPtr context_,
+ TableMetadata & result) const
{
Apache::Hadoop::Hive::Table table;
@@ -155,7 +163,7 @@ bool HiveCatalog::tryGetTableMetadata(const std::string & namespace_name, const
auto columns = table.sd.cols;
for (const auto & column : columns)
{
- schema.push_back({column.name, getType(column.type, true)});
+ schema.push_back({column.name, getType(column.type, true, context_)});
}
result.setSchema(schema);
}
diff --git a/src/Databases/DataLake/HiveCatalog.h b/src/Databases/DataLake/HiveCatalog.h
index 29b4e6ce6c63..0fba0e132486 100644
--- a/src/Databases/DataLake/HiveCatalog.h
+++ b/src/Databases/DataLake/HiveCatalog.h
@@ -38,9 +38,17 @@ class HiveCatalog final : public ICatalog, private DB::WithContext
bool existsTable(const std::string & namespace_name, const std::string & table_name) const override;
- void getTableMetadata(const std::string & namespace_name, const std::string & table_name, TableMetadata & result) const override;
-
- bool tryGetTableMetadata(const std::string & namespace_name, const std::string & table_name, TableMetadata & result) const override;
+ void getTableMetadata(
+ const std::string & namespace_name,
+ const std::string & table_name,
+ DB::ContextPtr context_,
+ TableMetadata & result) const override;
+
+ bool tryGetTableMetadata(
+ const std::string & namespace_name,
+ const std::string & table_name,
+ DB::ContextPtr context_,
+ TableMetadata & result) const override;
std::optional getStorageType() const override;
diff --git a/src/Databases/DataLake/ICatalog.cpp b/src/Databases/DataLake/ICatalog.cpp
index 85d701d86840..e2170c038e52 100644
--- a/src/Databases/DataLake/ICatalog.cpp
+++ b/src/Databases/DataLake/ICatalog.cpp
@@ -102,33 +102,44 @@ void TableMetadata::setLocation(const std::string & location_)
auto pos_to_path = location_.substr(pos_to_bucket).find('/');
if (pos_to_path == std::string::npos)
- throw DB::Exception(DB::ErrorCodes::NOT_IMPLEMENTED, "Unexpected location format: {}", location_);
-
- pos_to_path = pos_to_bucket + pos_to_path;
-
- location_without_path = location_.substr(0, pos_to_path);
- path = location_.substr(pos_to_path + 1);
-
- /// For Azure ABFSS format: abfss://container@account.dfs.core.windows.net/path
- /// The bucket (container) is the part before '@', not the whole string before '/'
- String bucket_part = location_.substr(pos_to_bucket, pos_to_path - pos_to_bucket);
- auto at_pos = bucket_part.find('@');
- if (at_pos != std::string::npos)
{
- /// Azure ABFSS format: extract container (before @) and account (after @)
- bucket = bucket_part.substr(0, at_pos);
- azure_account_with_suffix = bucket_part.substr(at_pos + 1);
- LOG_TEST(getLogger("TableMetadata"),
- "Parsed Azure location - container: {}, account: {}, path: {}",
- bucket, azure_account_with_suffix, path);
+ if (storage_type_str == "s3://")
+ { // empty path is allowed for AWS S3Table
+ location_without_path = location_;
+ path.clear();
+ bucket = location_.substr(pos_to_bucket);
+ }
+ else
+ throw DB::Exception(DB::ErrorCodes::NOT_IMPLEMENTED, "Unexpected location format: {}", location_);
}
else
{
- /// Standard format (S3, GCS, etc.)
- bucket = bucket_part;
- LOG_TEST(getLogger("TableMetadata"),
- "Parsed location without path: {}, path: {}",
- location_without_path, path);
+ pos_to_path = pos_to_bucket + pos_to_path;
+
+ location_without_path = location_.substr(0, pos_to_path);
+ path = location_.substr(pos_to_path + 1);
+
+ /// For Azure ABFSS format: abfss://container@account.dfs.core.windows.net/path
+ /// The bucket (container) is the part before '@', not the whole string before '/'
+ String bucket_part = location_.substr(pos_to_bucket, pos_to_path - pos_to_bucket);
+ auto at_pos = bucket_part.find('@');
+ if (at_pos != std::string::npos)
+ {
+ /// Azure ABFSS format: extract container (before @) and account (after @)
+ bucket = bucket_part.substr(0, at_pos);
+ azure_account_with_suffix = bucket_part.substr(at_pos + 1);
+ LOG_TEST(getLogger("TableMetadata"),
+ "Parsed Azure location - container: {}, account: {}, path: {}",
+ bucket, azure_account_with_suffix, path);
+ }
+ else
+ {
+ /// Standard format (S3, GCS, etc.)
+ bucket = bucket_part;
+ LOG_TEST(getLogger("TableMetadata"),
+ "Parsed location without path: {}, path: {}",
+ location_without_path, path);
+ }
}
}
diff --git a/src/Databases/DataLake/ICatalog.h b/src/Databases/DataLake/ICatalog.h
index e3333c1c58cd..1e67123447e2 100644
--- a/src/Databases/DataLake/ICatalog.h
+++ b/src/Databases/DataLake/ICatalog.h
@@ -10,6 +10,14 @@
#include
#include
+namespace DB
+{
+
+class Context;
+using ContextPtr = std::shared_ptr;
+
+}
+
namespace DataLake
{
@@ -158,6 +166,7 @@ class ICatalog
virtual void getTableMetadata(
const std::string & namespace_name,
const std::string & table_name,
+ DB::ContextPtr context,
TableMetadata & result) const = 0;
/// Get table metadata in the given namespace.
@@ -165,6 +174,7 @@ class ICatalog
virtual bool tryGetTableMetadata(
const std::string & namespace_name,
const std::string & table_name,
+ DB::ContextPtr context,
TableMetadata & result) const = 0;
/// Get storage type, where Iceberg tables' data is stored.
diff --git a/src/Databases/DataLake/PaimonRestCatalog.cpp b/src/Databases/DataLake/PaimonRestCatalog.cpp
index c53859ee9f40..63e33e993e45 100644
--- a/src/Databases/DataLake/PaimonRestCatalog.cpp
+++ b/src/Databases/DataLake/PaimonRestCatalog.cpp
@@ -467,7 +467,7 @@ bool PaimonRestCatalog::existsTable(const String & database_name, const String &
return true;
}
-bool PaimonRestCatalog::tryGetTableMetadata(const String & database_name, const String & table_name, TableMetadata & result) const
+bool PaimonRestCatalog::tryGetTableMetadata(const String & database_name, const String & table_name, DB::ContextPtr /*context_*/, TableMetadata & result) const
{
try
{
@@ -593,9 +593,9 @@ Poco::JSON::Object::Ptr PaimonRestCatalog::requestRest(
return json.extract();
}
-void PaimonRestCatalog::getTableMetadata(const String & database_name, const String & table_name, TableMetadata & result) const
+void PaimonRestCatalog::getTableMetadata(const String & database_name, const String & table_name, DB::ContextPtr context_, TableMetadata & result) const
{
- if (!tryGetTableMetadata(database_name, table_name, result))
+ if (!tryGetTableMetadata(database_name, table_name, context_, result))
{
throw DB::Exception(DB::ErrorCodes::DATALAKE_DATABASE_ERROR, "No response from paimon rest catalog");
}
diff --git a/src/Databases/DataLake/PaimonRestCatalog.h b/src/Databases/DataLake/PaimonRestCatalog.h
index 78713832e288..c81722c63964 100644
--- a/src/Databases/DataLake/PaimonRestCatalog.h
+++ b/src/Databases/DataLake/PaimonRestCatalog.h
@@ -89,9 +89,9 @@ class PaimonRestCatalog final : public ICatalog, private DB::WithContext
bool existsTable(const String & database_name, const String & table_name) const override;
- void getTableMetadata(const String & database_name, const String & table_name, TableMetadata & result) const override;
+ void getTableMetadata(const String & database_name, const String & table_name, DB::ContextPtr context_, TableMetadata & result) const override;
- bool tryGetTableMetadata(const String & database_name, const String & table_name, TableMetadata & result) const override;
+ bool tryGetTableMetadata(const String & database_name, const String & table_name, DB::ContextPtr /*context_*/, TableMetadata & result) const override;
std::optional getStorageType() const override { return storage_type; }
diff --git a/src/Databases/DataLake/RestCatalog.cpp b/src/Databases/DataLake/RestCatalog.cpp
index b71744e9660e..ddbd14252a7b 100644
--- a/src/Databases/DataLake/RestCatalog.cpp
+++ b/src/Databases/DataLake/RestCatalog.cpp
@@ -807,17 +807,18 @@ DB::Names RestCatalog::parseTables(DB::ReadBuffer & buf, const std::string & bas
bool RestCatalog::existsTable(const std::string & namespace_name, const std::string & table_name) const
{
TableMetadata table_metadata;
- return tryGetTableMetadata(namespace_name, table_name, table_metadata);
+ return tryGetTableMetadata(namespace_name, table_name, getContext(), table_metadata);
}
bool RestCatalog::tryGetTableMetadata(
const std::string & namespace_name,
const std::string & table_name,
+ DB::ContextPtr context_,
TableMetadata & result) const
{
try
{
- return getTableMetadataImpl(namespace_name, table_name, result);
+ return getTableMetadataImpl(namespace_name, table_name, context_, result);
}
catch (const DB::Exception & ex)
{
@@ -829,15 +830,17 @@ bool RestCatalog::tryGetTableMetadata(
void RestCatalog::getTableMetadata(
const std::string & namespace_name,
const std::string & table_name,
+ DB::ContextPtr context_,
TableMetadata & result) const
{
- if (!getTableMetadataImpl(namespace_name, table_name, result))
+ if (!getTableMetadataImpl(namespace_name, table_name, context_, result))
throw DB::Exception(DB::ErrorCodes::DATALAKE_DATABASE_ERROR, "No response from iceberg catalog");
}
bool RestCatalog::getTableMetadataImpl(
const std::string & namespace_name,
const std::string & table_name,
+ DB::ContextPtr context_,
TableMetadata & result) const
{
LOG_DEBUG(log, "Checking table {} in namespace {}", table_name, namespace_name);
@@ -898,8 +901,8 @@ bool RestCatalog::getTableMetadataImpl(
if (result.requiresSchema())
{
// int format_version = metadata_object->getValue("format-version");
- auto schema_processor = DB::Iceberg::IcebergSchemaProcessor();
- auto id = DB::IcebergMetadata::parseTableSchema(metadata_object, schema_processor, log);
+ auto schema_processor = DB::Iceberg::IcebergSchemaProcessor(context_);
+ auto id = DB::IcebergMetadata::parseTableSchema(metadata_object, schema_processor, context_, log);
auto schema = schema_processor.getClickhouseTableSchemaById(id);
result.setSchema(*schema);
}
diff --git a/src/Databases/DataLake/RestCatalog.h b/src/Databases/DataLake/RestCatalog.h
index 0837b5b88e9d..8ebb843930a1 100644
--- a/src/Databases/DataLake/RestCatalog.h
+++ b/src/Databases/DataLake/RestCatalog.h
@@ -55,11 +55,13 @@ class RestCatalog : public ICatalog, public DB::WithContext
void getTableMetadata(
const std::string & namespace_name,
const std::string & table_name,
+ DB::ContextPtr context_,
TableMetadata & result) const override;
bool tryGetTableMetadata(
const std::string & namespace_name,
const std::string & table_name,
+ DB::ContextPtr context_,
TableMetadata & result) const override;
std::optional getStorageType() const override;
@@ -152,6 +154,7 @@ class RestCatalog : public ICatalog, public DB::WithContext
bool getTableMetadataImpl(
const std::string & namespace_name,
const std::string & table_name,
+ DB::ContextPtr context_,
TableMetadata & result) const;
Config loadConfig();
diff --git a/src/Databases/DataLake/UnityCatalog.cpp b/src/Databases/DataLake/UnityCatalog.cpp
index 886425162e9b..7b1f1e00795f 100644
--- a/src/Databases/DataLake/UnityCatalog.cpp
+++ b/src/Databases/DataLake/UnityCatalog.cpp
@@ -92,9 +92,10 @@ DB::Names UnityCatalog::getTables() const
void UnityCatalog::getTableMetadata(
const std::string & namespace_name,
const std::string & table_name,
+ DB::ContextPtr context_,
TableMetadata & result) const
{
- if (!tryGetTableMetadata(namespace_name, table_name, result))
+ if (!tryGetTableMetadata(namespace_name, table_name, context_, result))
throw DB::Exception(DB::ErrorCodes::DATALAKE_DATABASE_ERROR, "No response from unity catalog");
}
@@ -160,6 +161,7 @@ void UnityCatalog::getCredentials(const std::string & table_id, TableMetadata &
bool UnityCatalog::tryGetTableMetadata(
const std::string & schema_name,
const std::string & table_name,
+ DB::ContextPtr /* context_ */,
TableMetadata & result) const
{
auto full_table_name = warehouse + "." + schema_name + "." + table_name;
diff --git a/src/Databases/DataLake/UnityCatalog.h b/src/Databases/DataLake/UnityCatalog.h
index 85c8a57a579b..3dfe71a78fac 100644
--- a/src/Databases/DataLake/UnityCatalog.h
+++ b/src/Databases/DataLake/UnityCatalog.h
@@ -34,11 +34,13 @@ class UnityCatalog final : public ICatalog, private DB::WithContext
void getTableMetadata(
const std::string & namespace_name,
const std::string & table_name,
+ DB::ContextPtr context_,
TableMetadata & result) const override;
bool tryGetTableMetadata(
const std::string & schema_name,
const std::string & table_name,
+ DB::ContextPtr context_,
TableMetadata & result) const override;
std::optional getStorageType() const override { return std::nullopt; }
diff --git a/src/Disks/DiskObjectStorage/ObjectStorages/AzureBlobStorage/AzureObjectStorage.cpp b/src/Disks/DiskObjectStorage/ObjectStorages/AzureBlobStorage/AzureObjectStorage.cpp
index 3363efbbbc2f..0aaf8564f5bf 100644
--- a/src/Disks/DiskObjectStorage/ObjectStorages/AzureBlobStorage/AzureObjectStorage.cpp
+++ b/src/Disks/DiskObjectStorage/ObjectStorages/AzureBlobStorage/AzureObjectStorage.cpp
@@ -22,6 +22,7 @@
#include
#include
#include
+#include
namespace CurrentMetrics
@@ -34,6 +35,7 @@ namespace CurrentMetrics
namespace ProfileEvents
{
extern const Event AzureListObjects;
+ extern const Event AzureListObjectsMicroseconds;
extern const Event DiskAzureListObjects;
extern const Event AzureDeleteObjects;
extern const Event DiskAzureDeleteObjects;
@@ -85,6 +87,7 @@ class AzureIteratorAsync final : public IObjectStorageIteratorAsync
ProfileEvents::increment(ProfileEvents::AzureListObjects);
if (client->IsClientForDisk())
ProfileEvents::increment(ProfileEvents::DiskAzureListObjects);
+ ProfileEventTimeIncrement watch(ProfileEvents::AzureListObjectsMicroseconds);
chassert(batch.empty());
auto blob_list_response = client->ListBlobs(options);
@@ -192,7 +195,15 @@ void AzureObjectStorage::listObjects(const std::string & path, RelativePathsWith
else
options.PageSizeHint = settings.get()->list_object_keys_size;
- for (auto blob_list_response = client_ptr->ListBlobs(options); blob_list_response.HasPage(); blob_list_response.MoveToNextPage())
+ AzureBlobStorage::ListBlobsPagedResponse blob_list_response;
+
+ auto list_blobs = [&]()->void
+ {
+ ProfileEventTimeIncrement watch(ProfileEvents::AzureListObjectsMicroseconds);
+ blob_list_response = client_ptr->ListBlobs(options);
+ };
+
+ for (list_blobs(); blob_list_response.HasPage(); blob_list_response.MoveToNextPage())
{
ProfileEvents::increment(ProfileEvents::AzureListObjects);
if (client_ptr->IsClientForDisk())
diff --git a/src/Disks/DiskObjectStorage/ObjectStorages/S3/S3ObjectStorage.cpp b/src/Disks/DiskObjectStorage/ObjectStorages/S3/S3ObjectStorage.cpp
index b4edbae24a4b..303c58efeb73 100644
--- a/src/Disks/DiskObjectStorage/ObjectStorages/S3/S3ObjectStorage.cpp
+++ b/src/Disks/DiskObjectStorage/ObjectStorages/S3/S3ObjectStorage.cpp
@@ -33,6 +33,7 @@
#include
#include
#include
+#include
#include
#include
@@ -40,6 +41,7 @@
namespace ProfileEvents
{
extern const Event S3ListObjects;
+ extern const Event S3ListObjectsMicroseconds;
extern const Event DiskS3DeleteObjects;
extern const Event DiskS3ListObjects;
}
@@ -148,7 +150,12 @@ class S3IteratorAsync final : public IObjectStorageIteratorAsync
ProfileEvents::increment(ProfileEvents::S3ListObjects);
ProfileEvents::increment(ProfileEvents::DiskS3ListObjects);
- auto outcome = client->ListObjectsV2(*request);
+ Aws::S3::Model::ListObjectsV2Outcome outcome;
+
+ {
+ ProfileEventTimeIncrement watch(ProfileEvents::S3ListObjectsMicroseconds);
+ outcome = client->ListObjectsV2(*request);
+ }
/// Outcome failure will be handled on the caller side.
if (outcome.IsSuccess())
@@ -321,7 +328,11 @@ void S3ObjectStorage::listObjects(const std::string & path, RelativePathsWithMet
ProfileEvents::increment(ProfileEvents::S3ListObjects);
ProfileEvents::increment(ProfileEvents::DiskS3ListObjects);
- outcome = client.get()->ListObjectsV2(request);
+ {
+ ProfileEventTimeIncrement watch(ProfileEvents::S3ListObjectsMicroseconds);
+ outcome = client.get()->ListObjectsV2(request);
+ }
+
throwIfError(outcome);
auto result = outcome.GetResult();
diff --git a/src/Disks/DiskType.cpp b/src/Disks/DiskType.cpp
index bf4506b4cbf6..ddc42cd07dc3 100644
--- a/src/Disks/DiskType.cpp
+++ b/src/Disks/DiskType.cpp
@@ -10,7 +10,7 @@ namespace ErrorCodes
extern const int LOGICAL_ERROR;
}
-MetadataStorageType metadataTypeFromString(const String & type)
+MetadataStorageType metadataTypeFromString(const std::string & type)
{
auto check_type = Poco::toLower(type);
if (check_type == "local")
@@ -58,25 +58,7 @@ String DataSourceDescription::name() const
case DataSourceType::RAM:
return "memory";
case DataSourceType::ObjectStorage:
- {
- switch (object_storage_type)
- {
- case ObjectStorageType::S3:
- return "s3";
- case ObjectStorageType::HDFS:
- return "hdfs";
- case ObjectStorageType::Azure:
- return "azure_blob_storage";
- case ObjectStorageType::Local:
- return "local_blob_storage";
- case ObjectStorageType::Web:
- return "web";
- case ObjectStorageType::None:
- return "none";
- case ObjectStorageType::Max:
- throw Exception(ErrorCodes::LOGICAL_ERROR, "Unexpected object storage type: Max");
- }
- }
+ return DB::toString(object_storage_type);
}
}
@@ -86,4 +68,45 @@ String DataSourceDescription::toString() const
name(), description, is_encrypted, is_cached, zookeeper_name);
}
+ObjectStorageType objectStorageTypeFromString(const std::string & type)
+{
+ auto check_type = Poco::toLower(type);
+ if (check_type == "s3")
+ return ObjectStorageType::S3;
+ if (check_type == "hdfs")
+ return ObjectStorageType::HDFS;
+ if (check_type == "azure_blob_storage" || check_type == "azure")
+ return ObjectStorageType::Azure;
+ if (check_type == "local_blob_storage" || check_type == "local")
+ return ObjectStorageType::Local;
+ if (check_type == "web")
+ return ObjectStorageType::Web;
+ if (check_type == "none")
+ return ObjectStorageType::None;
+
+ throw Exception(ErrorCodes::UNKNOWN_ELEMENT_IN_CONFIG,
+ "Unknown object storage type: {}", type);
+}
+
+std::string toString(ObjectStorageType type)
+{
+ switch (type)
+ {
+ case ObjectStorageType::S3:
+ return "s3";
+ case ObjectStorageType::HDFS:
+ return "hdfs";
+ case ObjectStorageType::Azure:
+ return "azure_blob_storage";
+ case ObjectStorageType::Local:
+ return "local_blob_storage";
+ case ObjectStorageType::Web:
+ return "web";
+ case ObjectStorageType::None:
+ return "none";
+ case ObjectStorageType::Max:
+ throw Exception(ErrorCodes::LOGICAL_ERROR, "Unexpected object storage type: Max");
+ }
+}
+
}
diff --git a/src/Disks/DiskType.h b/src/Disks/DiskType.h
index 9018cd605481..835c1341775b 100644
--- a/src/Disks/DiskType.h
+++ b/src/Disks/DiskType.h
@@ -36,7 +36,10 @@ enum class MetadataStorageType : uint8_t
Memory,
};
-MetadataStorageType metadataTypeFromString(const String & type);
+MetadataStorageType metadataTypeFromString(const std::string & type);
+
+ObjectStorageType objectStorageTypeFromString(const std::string & type);
+std::string toString(ObjectStorageType type);
struct DataSourceDescription
{
diff --git a/src/IO/ReadBufferFromS3.cpp b/src/IO/ReadBufferFromS3.cpp
index 74e654c5160b..b46a2d55c7f0 100644
--- a/src/IO/ReadBufferFromS3.cpp
+++ b/src/IO/ReadBufferFromS3.cpp
@@ -501,6 +501,12 @@ Aws::S3::Model::GetObjectResult ReadBufferFromS3::sendRequest(size_t attempt, si
log, "Read S3 object. Bucket: {}, Key: {}, Version: {}, Offset: {}",
bucket, key, version_id.empty() ? "Latest" : version_id, range_begin);
}
+ else
+ {
+ LOG_TEST(
+ log, "Read S3 object. Bucket: {}, Key: {}, Version: {}",
+ bucket, key, version_id.empty() ? "Latest" : version_id);
+ }
ProfileEvents::increment(ProfileEvents::S3GetObject);
if (client_ptr->isClientForDisk())
diff --git a/src/IO/S3/Client.cpp b/src/IO/S3/Client.cpp
index 27ecbb2a1dff..5df6a86d4327 100644
--- a/src/IO/S3/Client.cpp
+++ b/src/IO/S3/Client.cpp
@@ -455,7 +455,7 @@ Model::HeadObjectOutcome Client::HeadObject(HeadObjectRequest & request) const
auto bucket_uri = getURIForBucket(bucket);
if (!bucket_uri)
{
- if (auto maybe_error = updateURIForBucketForHead(bucket); maybe_error.has_value())
+ if (auto maybe_error = updateURIForBucketForHead(bucket, request.GetKey()); maybe_error.has_value())
return *maybe_error;
if (auto region = getRegionForBucket(bucket); !region.empty())
@@ -672,7 +672,6 @@ Client::doRequest(RequestType & request, RequestFn request_fn) const
if (auto uri = getURIForBucket(bucket); uri.has_value())
request.overrideURI(std::move(*uri));
-
bool found_new_endpoint = false;
// if we found correct endpoint after 301 responses, update the cache for future requests
SCOPE_EXIT(
@@ -1041,12 +1040,15 @@ std::optional Client::getURIFromError(const Aws::S3::S3Error & error) c
}
// Do a list request because head requests don't have body in response
-std::optional Client::updateURIForBucketForHead(const std::string & bucket) const
+// S3 Tables don't support ListObjects, so made dirty workaroung - changed on GetObject
+std::optional Client::updateURIForBucketForHead(const std::string & bucket, const std::string & key) const
{
- ListObjectsV2Request req;
+ GetObjectRequest req;
req.SetBucket(bucket);
- req.SetMaxKeys(1);
- auto result = ListObjectsV2(req);
+ req.SetKey(key);
+ req.SetRange("bytes=0-1");
+ auto result = GetObject(req);
+
if (result.IsSuccess())
return std::nullopt;
return result.GetError();
diff --git a/src/IO/S3/Client.h b/src/IO/S3/Client.h
index 61ee4ead3dc0..4a65239a8582 100644
--- a/src/IO/S3/Client.h
+++ b/src/IO/S3/Client.h
@@ -285,7 +285,7 @@ class Client : private Aws::S3::S3Client
void updateURIForBucket(const std::string & bucket, S3::URI new_uri) const;
std::optional getURIFromError(const Aws::S3::S3Error & error) const;
- std::optional updateURIForBucketForHead(const std::string & bucket) const;
+ std::optional updateURIForBucketForHead(const std::string & bucket, const std::string & key) const;
std::optional getURIForBucket(const std::string & bucket) const;
diff --git a/src/IO/S3/URI.cpp b/src/IO/S3/URI.cpp
index b8150e740144..5ab8e5cfd724 100644
--- a/src/IO/S3/URI.cpp
+++ b/src/IO/S3/URI.cpp
@@ -191,10 +191,72 @@ URI::URI(const std::string & uri_, bool allow_archive_path_syntax, bool keep_pre
validateKey(key, uri);
}
+bool URI::isAWSRegion(std::string_view region)
+{
+ /// List from https://docs.aws.amazon.com/general/latest/gr/s3.html
+ static const std::unordered_set regions = {
+ "us-east-2",
+ "us-east-1",
+ "us-west-1",
+ "us-west-2",
+ "af-south-1",
+ "ap-east-1",
+ "ap-south-2",
+ "ap-southeast-3",
+ "ap-southeast-5",
+ "ap-southeast-4",
+ "ap-south-1",
+ "ap-northeast-3",
+ "ap-northeast-2",
+ "ap-southeast-1",
+ "ap-southeast-2",
+ "ap-east-2",
+ "ap-southeast-7",
+ "ap-northeast-1",
+ "ca-central-1",
+ "ca-west-1",
+ "eu-central-1",
+ "eu-west-1",
+ "eu-west-2",
+ "eu-south-1",
+ "eu-west-3",
+ "eu-south-2",
+ "eu-north-1",
+ "eu-central-2",
+ "il-central-1",
+ "mx-central-1",
+ "me-south-1",
+ "me-central-1",
+ "sa-east-1",
+ "us-gov-east-1",
+ "us-gov-west-1"
+ };
+
+ /// 's3-us-west-2' is a legacy region format for S3 storage, equals to 'us-west-2'
+ /// See https://docs.aws.amazon.com/AmazonS3/latest/userguide/VirtualHosting.html#VirtualHostingBackwardsCompatibility
+ if (region.substr(0, 3) == "s3-")
+ region = region.substr(3);
+
+ return regions.contains(region);
+}
+
void URI::addRegionToURI(const std::string ®ion)
{
if (auto pos = endpoint.find(".amazonaws.com"); pos != std::string::npos)
+ {
+ if (pos > 0)
+ { /// Check if region is already in endpoint to avoid add it second time
+ auto prev_pos = endpoint.find_last_of("/.", pos - 1);
+ if (prev_pos == std::string::npos)
+ prev_pos = 0;
+ else
+ ++prev_pos;
+ std::string_view endpoint_region = std::string_view(endpoint).substr(prev_pos, pos - prev_pos);
+ if (isAWSRegion(endpoint_region))
+ return;
+ }
endpoint = endpoint.substr(0, pos) + "." + region + endpoint.substr(pos);
+ }
}
void URI::validateBucket(const String & bucket, const Poco::URI & uri)
diff --git a/src/IO/S3/URI.h b/src/IO/S3/URI.h
index fa259b9de451..fd45baa39774 100644
--- a/src/IO/S3/URI.h
+++ b/src/IO/S3/URI.h
@@ -44,6 +44,10 @@ struct URI
static void validateBucket(const std::string & bucket, const Poco::URI & uri);
static void validateKey(const std::string & key, const Poco::URI & uri);
+
+ /// Returns true if 'region' string is an AWS S3 region
+ /// https://docs.aws.amazon.com/general/latest/gr/s3.html
+ static bool isAWSRegion(std::string_view region);
};
}
diff --git a/src/IO/S3/getObjectInfo.cpp b/src/IO/S3/getObjectInfo.cpp
index a31c5f7add33..b54ce9cf0b6c 100644
--- a/src/IO/S3/getObjectInfo.cpp
+++ b/src/IO/S3/getObjectInfo.cpp
@@ -1,6 +1,7 @@
#include
#include
#include
+#include
#if USE_AWS_S3
@@ -15,6 +16,7 @@ namespace ProfileEvents
extern const Event S3GetObject;
extern const Event S3GetObjectTagging;
extern const Event S3HeadObject;
+ extern const Event S3HeadObjectMicroseconds;
extern const Event DiskS3GetObject;
extern const Event DiskS3GetObjectTagging;
extern const Event DiskS3HeadObject;
@@ -35,6 +37,7 @@ namespace
ProfileEvents::increment(ProfileEvents::S3HeadObject);
if (client.isClientForDisk())
ProfileEvents::increment(ProfileEvents::DiskS3HeadObject);
+ ProfileEventTimeIncrement watch(ProfileEvents::S3HeadObjectMicroseconds);
S3::HeadObjectRequest req;
req.SetBucket(bucket);
diff --git a/src/IO/S3Common.cpp b/src/IO/S3Common.cpp
index 6d49fb90ef59..d281d5102e39 100644
--- a/src/IO/S3Common.cpp
+++ b/src/IO/S3Common.cpp
@@ -19,14 +19,6 @@
#include
-namespace ProfileEvents
-{
- extern const Event S3GetObjectMetadata;
- extern const Event S3HeadObject;
- extern const Event DiskS3GetObjectMetadata;
- extern const Event DiskS3HeadObject;
-}
-
namespace DB
{
diff --git a/src/Interpreters/Cluster.cpp b/src/Interpreters/Cluster.cpp
index 9dfae5c4dcf9..95f0559748fc 100644
--- a/src/Interpreters/Cluster.cpp
+++ b/src/Interpreters/Cluster.cpp
@@ -740,9 +740,9 @@ void Cluster::initMisc()
}
}
-std::unique_ptr Cluster::getClusterWithReplicasAsShards(const Settings & settings, size_t max_replicas_from_shard) const
+std::unique_ptr Cluster::getClusterWithReplicasAsShards(const Settings & settings, size_t max_replicas_from_shard, size_t max_hosts) const
{
- return std::unique_ptr{ new Cluster(ReplicasAsShardsTag{}, *this, settings, max_replicas_from_shard)};
+ return std::unique_ptr{ new Cluster(ReplicasAsShardsTag{}, *this, settings, max_replicas_from_shard, max_hosts)};
}
std::unique_ptr Cluster::getClusterWithSingleShard(size_t index) const
@@ -791,7 +791,7 @@ void shuffleReplicas(std::vector & replicas, const Settings &
}
-Cluster::Cluster(Cluster::ReplicasAsShardsTag, const Cluster & from, const Settings & settings, size_t max_replicas_from_shard)
+Cluster::Cluster(Cluster::ReplicasAsShardsTag, const Cluster & from, const Settings & settings, size_t max_replicas_from_shard, size_t max_hosts)
{
if (from.addresses_with_failover.empty())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Cluster is empty");
@@ -813,6 +813,7 @@ Cluster::Cluster(Cluster::ReplicasAsShardsTag, const Cluster & from, const Setti
if (address.is_local)
info.local_addresses.push_back(address);
+ addresses_with_failover.emplace_back(Addresses({address}));
auto pool = ConnectionPoolFactory::instance().get(
static_cast(settings[Setting::distributed_connections_pool_size]),
@@ -836,9 +837,6 @@ Cluster::Cluster(Cluster::ReplicasAsShardsTag, const Cluster & from, const Setti
info.per_replica_pools = {std::move(pool)};
info.default_database = address.default_database;
- addresses_with_failover.emplace_back(Addresses{address});
-
- slot_to_shard.insert(std::end(slot_to_shard), info.weight, shards_info.size());
shards_info.emplace_back(std::move(info));
}
};
@@ -860,10 +858,37 @@ Cluster::Cluster(Cluster::ReplicasAsShardsTag, const Cluster & from, const Setti
secret = from.secret;
name = from.name;
+ constrainShardInfoAndAddressesToMaxHosts(max_hosts);
+
+ for (size_t i = 0; i < shards_info.size(); ++i)
+ slot_to_shard.insert(std::end(slot_to_shard), shards_info[i].weight, i);
+
initMisc();
}
+void Cluster::constrainShardInfoAndAddressesToMaxHosts(size_t max_hosts)
+{
+ if (max_hosts == 0 || shards_info.size() <= max_hosts)
+ return;
+
+ pcg64_fast gen{randomSeed()};
+ std::shuffle(shards_info.begin(), shards_info.end(), gen);
+ shards_info.resize(max_hosts);
+
+ AddressesWithFailover addresses_with_failover_;
+
+ UInt32 shard_num = 0;
+ for (auto & shard_info : shards_info)
+ {
+ addresses_with_failover_.push_back(addresses_with_failover[shard_info.shard_num - 1]);
+ shard_info.shard_num = ++shard_num;
+ }
+
+ addresses_with_failover.swap(addresses_with_failover_);
+}
+
+
Cluster::Cluster(Cluster::SubclusterTag, const Cluster & from, const std::vector & indices)
{
for (size_t index : indices)
diff --git a/src/Interpreters/Cluster.h b/src/Interpreters/Cluster.h
index b00cf5738f4b..b20f989da2d2 100644
--- a/src/Interpreters/Cluster.h
+++ b/src/Interpreters/Cluster.h
@@ -276,7 +276,7 @@ class Cluster
std::unique_ptr getClusterWithMultipleShards(const std::vector & indices) const;
/// Get a new Cluster that contains all servers (all shards with all replicas) from existing cluster as independent shards.
- std::unique_ptr getClusterWithReplicasAsShards(const Settings & settings, size_t max_replicas_from_shard = 0) const;
+ std::unique_ptr getClusterWithReplicasAsShards(const Settings & settings, size_t max_replicas_from_shard = 0, size_t max_hosts = 0) const;
/// Returns false if cluster configuration doesn't allow to use it for cross-replication.
/// NOTE: true does not mean, that it's actually a cross-replication cluster.
@@ -302,7 +302,7 @@ class Cluster
/// For getClusterWithReplicasAsShards implementation
struct ReplicasAsShardsTag {};
- Cluster(ReplicasAsShardsTag, const Cluster & from, const Settings & settings, size_t max_replicas_from_shard);
+ Cluster(ReplicasAsShardsTag, const Cluster & from, const Settings & settings, size_t max_replicas_from_shard, size_t max_hosts);
void addShard(
const Settings & settings,
@@ -313,6 +313,9 @@ class Cluster
UInt32 weight = 1,
bool internal_replication = false);
+ /// Reduce size of cluster to max_hosts
+ void constrainShardInfoAndAddressesToMaxHosts(size_t max_hosts);
+
/// Inter-server secret
String secret;
diff --git a/src/Interpreters/ClusterDiscovery.cpp b/src/Interpreters/ClusterDiscovery.cpp
index 98f57c31e9e5..e150da22cb1e 100644
--- a/src/Interpreters/ClusterDiscovery.cpp
+++ b/src/Interpreters/ClusterDiscovery.cpp
@@ -290,17 +290,32 @@ Strings ClusterDiscovery::getNodeNames(zkutil::ZooKeeperPtr & zk,
auto callback = get_nodes_callbacks.find(cluster_name);
if (callback == get_nodes_callbacks.end())
{
- auto watch_dynamic_callback = std::make_shared([
- cluster_name,
- my_clusters_to_update = clusters_to_update,
- my_discovery_paths_need_update = multicluster_discovery_paths[zk_root_index - 1].need_update
- ](auto)
- {
- my_discovery_paths_need_update->store(true);
- my_clusters_to_update->set(cluster_name);
- });
- auto res = get_nodes_callbacks.insert(std::make_pair(cluster_name, watch_dynamic_callback));
- callback = res.first;
+ if (zk_root_index > 0)
+ {
+ auto watch_dynamic_callback = std::make_shared([
+ cluster_name,
+ my_clusters_to_update = clusters_to_update,
+ my_discovery_paths_need_update = multicluster_discovery_paths[zk_root_index - 1].need_update
+ ](auto)
+ {
+ my_discovery_paths_need_update->store(true);
+ my_clusters_to_update->set(cluster_name);
+ });
+ auto res = get_nodes_callbacks.insert(std::make_pair(cluster_name, watch_dynamic_callback));
+ callback = res.first;
+ }
+ else
+ { // zk_root_index == 0 for static clusters
+ auto watch_dynamic_callback = std::make_shared([
+ cluster_name,
+ my_clusters_to_update = clusters_to_update
+ ](auto)
+ {
+ my_clusters_to_update->set(cluster_name);
+ });
+ auto res = get_nodes_callbacks.insert(std::make_pair(cluster_name, watch_dynamic_callback));
+ callback = res.first;
+ }
}
nodes = zk->getChildrenWatch(getShardsListPath(zk_root), &stat, callback->second);
}
diff --git a/src/Interpreters/IcebergMetadataLog.cpp b/src/Interpreters/IcebergMetadataLog.cpp
index 2df936e77253..c0552c14e0c2 100644
--- a/src/Interpreters/IcebergMetadataLog.cpp
+++ b/src/Interpreters/IcebergMetadataLog.cpp
@@ -12,6 +12,7 @@
#include
#include
#include
+#include
#include
#include
@@ -80,7 +81,7 @@ void IcebergMetadataLogElement::appendToBlock(MutableColumns & columns) const
void insertRowToLogTable(
const ContextPtr & local_context,
- String row,
+ std::function get_row,
IcebergMetadataLogLevel row_log_level,
const String & table_path,
const String & file_path,
@@ -108,7 +109,7 @@ void insertRowToLogTable(
.content_type = row_log_level,
.table_path = table_path,
.file_path = file_path,
- .metadata_content = row,
+ .metadata_content = get_row(),
.row_in_file = row_in_file,
.pruning_status = pruning_status});
}
diff --git a/src/Interpreters/IcebergMetadataLog.h b/src/Interpreters/IcebergMetadataLog.h
index 0a86cf921083..be1114c4a847 100644
--- a/src/Interpreters/IcebergMetadataLog.h
+++ b/src/Interpreters/IcebergMetadataLog.h
@@ -26,9 +26,11 @@ struct IcebergMetadataLogElement
void appendToBlock(MutableColumns & columns) const;
};
+/// Here `get_row` function is used instead `row` string to calculate string only when required.
+/// Inside `insertRowToLogTable` code can exit immediately after `iceberg_metadata_log_level` setting check.
void insertRowToLogTable(
const ContextPtr & local_context,
- String row,
+ std::function get_row,
IcebergMetadataLogLevel row_log_level,
const String & table_path,
const String & file_path,
diff --git a/src/Interpreters/InterpreterCreateQuery.cpp b/src/Interpreters/InterpreterCreateQuery.cpp
index 8cffff74a894..8540f1893900 100644
--- a/src/Interpreters/InterpreterCreateQuery.cpp
+++ b/src/Interpreters/InterpreterCreateQuery.cpp
@@ -2026,8 +2026,7 @@ bool InterpreterCreateQuery::doCreateTable(ASTCreateQuery & create,
auto table_function_ast = create.as_table_function->ptr();
auto table_function = TableFunctionFactory::instance().get(table_function_ast, getContext());
- if (!table_function->canBeUsedToCreateTable())
- throw Exception(ErrorCodes::BAD_ARGUMENTS, "Table function '{}' cannot be used to create a table", table_function->getName());
+ table_function->validateUseToCreateTable();
/// In case of CREATE AS table_function() query we should use global context
/// in storage creation because there will be no query context on server startup
diff --git a/src/Interpreters/InterpreterInsertQuery.cpp b/src/Interpreters/InterpreterInsertQuery.cpp
index a254a0ad9ea5..69281ec1f95d 100644
--- a/src/Interpreters/InterpreterInsertQuery.cpp
+++ b/src/Interpreters/InterpreterInsertQuery.cpp
@@ -814,6 +814,9 @@ std::optional InterpreterInsertQuery::distributedWriteIntoReplica
if (!src_storage_cluster)
return {};
+ if (src_storage_cluster->getClusterName(local_context).empty())
+ return {};
+
if (!isInsertSelectTrivialEnoughForDistributedExecution(query))
return {};
diff --git a/src/Parsers/ASTSetQuery.cpp b/src/Parsers/ASTSetQuery.cpp
index f4fe077280ac..968e4f4ee569 100644
--- a/src/Parsers/ASTSetQuery.cpp
+++ b/src/Parsers/ASTSetQuery.cpp
@@ -131,7 +131,8 @@ void ASTSetQuery::formatImpl(WriteBuffer & ostr, const FormatSettings & format,
return true;
}
- if (DataLake::DATABASE_ENGINE_NAME == state.create_engine_name)
+ if (DataLake::DATABASE_ENGINE_NAME == state.create_engine_name
+ || DataLake::DATABASE_ALIAS_NAME == state.create_engine_name)
{
if (DataLake::SETTINGS_TO_HIDE.contains(change.name))
{
diff --git a/src/Parsers/FunctionSecretArgumentsFinder.h b/src/Parsers/FunctionSecretArgumentsFinder.h
index 8a3ef97422e8..e2b9a957f833 100644
--- a/src/Parsers/FunctionSecretArgumentsFinder.h
+++ b/src/Parsers/FunctionSecretArgumentsFinder.h
@@ -3,9 +3,12 @@
#include
#include
#include
+#include
+#include
#include
#include
#include
+#include
namespace DB
@@ -29,6 +32,21 @@ class AbstractFunction
virtual ~Arguments() = default;
virtual size_t size() const = 0;
virtual std::unique_ptr at(size_t n) const = 0;
+ void skipArgument(size_t n) { skipped_indexes.insert(n); }
+ void unskipArguments() { skipped_indexes.clear(); }
+ size_t getRealIndex(size_t n) const
+ {
+ for (auto idx : skipped_indexes)
+ {
+ if (n < idx)
+ break;
+ ++n;
+ }
+ return n;
+ }
+ size_t skippedSize() const { return skipped_indexes.size(); }
+ private:
+ std::set skipped_indexes;
};
virtual ~AbstractFunction() = default;
@@ -77,14 +95,15 @@ class FunctionSecretArgumentsFinder
{
if (index >= function->arguments->size())
return;
+ auto real_index = function->arguments->getRealIndex(index);
if (!result.count)
{
- result.start = index;
+ result.start = real_index;
result.are_named = argument_is_named;
}
- chassert(index >= result.start); /// We always check arguments consecutively
+ chassert(real_index >= result.start); /// We always check arguments consecutively
chassert(result.replacement.empty()); /// We shouldn't use replacement with masking other arguments
- result.count = index + 1 - result.start;
+ result.count = real_index + 1 - result.start;
if (!argument_is_named)
result.are_named = false;
}
@@ -102,8 +121,16 @@ class FunctionSecretArgumentsFinder
{
findMongoDBSecretArguments();
}
+ else if (function->name() == "iceberg")
+ {
+ findIcebergFunctionSecretArguments(/* is_cluster_function= */ false);
+ }
+ else if (function ->name() == "icebergCluster")
+ {
+ findIcebergFunctionSecretArguments(/* is_cluster_function= */ true);
+ }
else if ((function->name() == "s3") || (function->name() == "cosn") || (function->name() == "oss") ||
- (function->name() == "deltaLake") || (function->name() == "hudi") || (function->name() == "iceberg") ||
+ (function->name() == "deltaLake") || (function->name() == "hudi") ||
(function->name() == "gcs") || (function->name() == "icebergS3") || (function->name() == "paimon") ||
(function->name() == "paimonS3"))
{
@@ -112,7 +139,7 @@ class FunctionSecretArgumentsFinder
}
else if ((function->name() == "s3Cluster") || (function ->name() == "hudiCluster") ||
(function ->name() == "deltaLakeCluster") || (function ->name() == "deltaLakeS3Cluster") ||
- (function ->name() == "icebergS3Cluster") || (function ->name() == "icebergCluster"))
+ (function ->name() == "icebergS3Cluster"))
{
/// s3Cluster('cluster_name', 'url', 'aws_access_key_id', 'aws_secret_access_key', ...)
findS3FunctionSecretArguments(/* is_cluster_function= */ true);
@@ -270,6 +297,12 @@ class FunctionSecretArgumentsFinder
findSecretNamedArgument("secret_access_key", 1);
return;
}
+ if (is_cluster_function && isNamedCollectionName(1))
+ {
+ /// s3Cluster(cluster, named_collection, ..., secret_access_key = 'secret_access_key', ...)
+ findSecretNamedArgument("secret_access_key", 2);
+ return;
+ }
findSecretNamedArgument("secret_access_key", url_arg_idx);
@@ -277,6 +310,7 @@ class FunctionSecretArgumentsFinder
/// s3('url', NOSIGN, 'format' [, 'compression'] [, extra_credentials(..)] [, headers(..)])
/// s3('url', 'format', 'structure' [, 'compression'] [, extra_credentials(..)] [, headers(..)])
size_t count = excludeS3OrURLNestedMaps();
+
if ((url_arg_idx + 3 <= count) && (count <= url_arg_idx + 4))
{
String second_arg;
@@ -341,6 +375,48 @@ class FunctionSecretArgumentsFinder
markSecretArgument(url_arg_idx + 4);
}
+ std::string findIcebergStorageType(bool is_cluster_function)
+ {
+ std::string storage_type = "s3";
+
+ size_t count = function->arguments->size();
+ if (!count)
+ return storage_type;
+
+ auto storage_type_idx = findNamedArgument(&storage_type, "storage_type");
+ if (storage_type_idx != -1)
+ {
+ storage_type = Poco::toLower(storage_type);
+ function->arguments->skipArgument(storage_type_idx);
+ }
+ else if (isNamedCollectionName(is_cluster_function ? 1 : 0))
+ {
+ std::string collection_name;
+ if (function->arguments->at(is_cluster_function ? 1 : 0)->tryGetString(&collection_name, true))
+ {
+ NamedCollectionPtr collection = NamedCollectionFactory::instance().tryGet(collection_name);
+ if (collection && collection->has("storage_type"))
+ {
+ storage_type = Poco::toLower(collection->get("storage_type"));
+ }
+ }
+ }
+
+ return storage_type;
+ }
+
+ void findIcebergFunctionSecretArguments(bool is_cluster_function)
+ {
+ auto storage_type = findIcebergStorageType(is_cluster_function);
+
+ if (storage_type == "s3")
+ findS3FunctionSecretArguments(is_cluster_function);
+ else if (storage_type == "azure")
+ findAzureBlobStorageFunctionSecretArguments(is_cluster_function);
+
+ function->arguments->unskipArguments();
+ }
+
bool maskAzureConnectionString(ssize_t url_arg_idx, bool argument_is_named = false, size_t start = 0)
{
String url_arg;
@@ -364,7 +440,7 @@ class FunctionSecretArgumentsFinder
if (RE2::Replace(&url_arg, account_key_pattern, "AccountKey=[HIDDEN]\\1"))
{
chassert(result.count == 0); /// We shouldn't use replacement with masking other arguments
- result.start = url_arg_idx;
+ result.start = function->arguments->getRealIndex(url_arg_idx);
result.are_named = argument_is_named;
result.count = 1;
result.replacement = url_arg;
@@ -375,7 +451,7 @@ class FunctionSecretArgumentsFinder
if (RE2::Replace(&url_arg, sas_signature_pattern, "SharedAccessSignature=[HIDDEN]\\1"))
{
chassert(result.count == 0); /// We shouldn't use replacement with masking other arguments
- result.start = url_arg_idx;
+ result.start = function->arguments->getRealIndex(url_arg_idx);
result.are_named = argument_is_named;
result.count = 1;
result.replacement = url_arg;
@@ -534,6 +610,7 @@ class FunctionSecretArgumentsFinder
void findTableEngineSecretArguments()
{
const String & engine_name = function->name();
+
if (engine_name == "ExternalDistributed")
{
/// ExternalDistributed('engine', 'host:port', 'database', 'table', 'user', 'password')
@@ -551,10 +628,13 @@ class FunctionSecretArgumentsFinder
{
findMongoDBSecretArguments();
}
+ else if (engine_name == "Iceberg")
+ {
+ findIcebergTableEngineSecretArguments();
+ }
else if ((engine_name == "S3") || (engine_name == "COSN") || (engine_name == "OSS")
|| (engine_name == "DeltaLake") || (engine_name == "Hudi")
- || (engine_name == "Iceberg") || (engine_name == "IcebergS3")
- || (engine_name == "S3Queue"))
+ || (engine_name == "IcebergS3") || (engine_name == "S3Queue"))
{
/// S3('url', ['aws_access_key_id', 'aws_secret_access_key',] ...)
findS3TableEngineSecretArguments();
@@ -563,7 +643,7 @@ class FunctionSecretArgumentsFinder
{
findURLSecretArguments();
}
- else if (engine_name == "AzureBlobStorage" || engine_name == "AzureQueue")
+ else if (engine_name == "AzureBlobStorage" || engine_name == "AzureQueue" || engine_name == "IcebergAzure")
{
findAzureBlobStorageTableEngineSecretArguments();
}
@@ -681,6 +761,18 @@ class FunctionSecretArgumentsFinder
markSecretArgument(2);
}
+ void findIcebergTableEngineSecretArguments()
+ {
+ auto storage_type = findIcebergStorageType(0);
+
+ if (storage_type == "s3")
+ findS3TableEngineSecretArguments();
+ else if (storage_type == "azure")
+ findAzureBlobStorageTableEngineSecretArguments();
+
+ function->arguments->unskipArguments();
+ }
+
void findDatabaseEngineSecretArguments()
{
const String & engine_name = function->name();
@@ -697,7 +789,7 @@ class FunctionSecretArgumentsFinder
/// S3('url', 'access_key_id', 'secret_access_key')
findS3DatabaseSecretArguments();
}
- else if (engine_name == "DataLakeCatalog")
+ else if (engine_name == "DataLakeCatalog" || engine_name == "Iceberg")
{
findDataLakeCatalogSecretArguments();
}
diff --git a/src/Parsers/FunctionSecretArgumentsFinderAST.h b/src/Parsers/FunctionSecretArgumentsFinderAST.h
index 86211b3a299c..42d6ffc806c0 100644
--- a/src/Parsers/FunctionSecretArgumentsFinderAST.h
+++ b/src/Parsers/FunctionSecretArgumentsFinderAST.h
@@ -54,10 +54,13 @@ class FunctionAST : public AbstractFunction
{
public:
explicit ArgumentsAST(const ASTs * arguments_) : arguments(arguments_) {}
- size_t size() const override { return arguments ? arguments->size() : 0; }
+ size_t size() const override
+ { /// size withous skipped indexes
+ return arguments ? arguments->size() - skippedSize() : 0;
+ }
std::unique_ptr at(size_t n) const override
- {
- return std::make_unique(arguments->at(n).get());
+ { /// n is relative index, some can be skipped
+ return std::make_unique(arguments->at(getRealIndex(n)).get());
}
private:
const ASTs * arguments = nullptr;
diff --git a/src/Processors/QueryPlan/ReadFromObjectStorageStep.cpp b/src/Processors/QueryPlan/ReadFromObjectStorageStep.cpp
index ee08f83b46d8..827c53b3ed6b 100644
--- a/src/Processors/QueryPlan/ReadFromObjectStorageStep.cpp
+++ b/src/Processors/QueryPlan/ReadFromObjectStorageStep.cpp
@@ -132,7 +132,7 @@ void ReadFromObjectStorageStep::initializePipeline(QueryPipelineBuilder & pipeli
size_t output_ports = pipe.numOutputPorts();
const bool parallelize_output = context->getSettingsRef()[Setting::parallelize_output_from_storages];
if (parallelize_output
- && FormatFactory::instance().checkParallelizeOutputAfterReading(configuration->format, context)
+ && FormatFactory::instance().checkParallelizeOutputAfterReading(configuration->getFormat(), context)
&& output_ports > 0 && output_ports < max_num_streams)
pipe.resize(max_num_streams);
diff --git a/src/Server/TCPHandler.cpp b/src/Server/TCPHandler.cpp
index 85ad3a977a4c..dfcd5ffca2bc 100644
--- a/src/Server/TCPHandler.cpp
+++ b/src/Server/TCPHandler.cpp
@@ -23,6 +23,7 @@
#include
#include
#include
+#include
#include
#include
#include
@@ -34,7 +35,6 @@
#include
#include
#include
-#include
#include
#include
#include
diff --git a/src/Storages/HivePartitioningUtils.cpp b/src/Storages/HivePartitioningUtils.cpp
index 86084717dd8e..060f04474e98 100644
--- a/src/Storages/HivePartitioningUtils.cpp
+++ b/src/Storages/HivePartitioningUtils.cpp
@@ -210,9 +210,9 @@ HivePartitionColumnsWithFileColumnsPair setupHivePartitioningForObjectStorage(
* Otherwise, in case `use_hive_partitioning=1`, we can keep the old behavior of extracting it from the sample path.
* And if the schema was inferred (not specified in the table definition), we need to enrich it with the path partition columns
*/
- if (configuration->partition_strategy && configuration->partition_strategy_type == PartitionStrategyFactory::StrategyType::HIVE)
+ if (configuration->getPartitionStrategy() && configuration->getPartitionStrategyType() == PartitionStrategyFactory::StrategyType::HIVE)
{
- hive_partition_columns_to_read_from_file_path = configuration->partition_strategy->getPartitionColumns();
+ hive_partition_columns_to_read_from_file_path = configuration->getPartitionStrategy()->getPartitionColumns();
sanityCheckSchemaAndHivePartitionColumns(hive_partition_columns_to_read_from_file_path, columns, /* check_contained_in_schema */true);
}
else if (context->getSettingsRef()[Setting::use_hive_partitioning])
@@ -226,7 +226,7 @@ HivePartitionColumnsWithFileColumnsPair setupHivePartitioningForObjectStorage(
sanityCheckSchemaAndHivePartitionColumns(hive_partition_columns_to_read_from_file_path, columns, /* check_contained_in_schema */false);
}
- if (configuration->partition_columns_in_data_file)
+ if (configuration->getPartitionColumnsInDataFile())
{
file_columns = columns.getAllPhysical();
}
diff --git a/src/Storages/IStorage.h b/src/Storages/IStorage.h
index f0084630324d..ed69b0365ae6 100644
--- a/src/Storages/IStorage.h
+++ b/src/Storages/IStorage.h
@@ -71,6 +71,9 @@ using ConditionSelectivityEstimatorPtr = std::shared_ptr;
+
class ActionsDAG;
/** Storage. Describes the table. Responsible for
@@ -434,6 +437,7 @@ class IStorage : public std::enable_shared_from_this, public TypePromo
size_t /*max_block_size*/,
size_t /*num_streams*/);
+public:
/// Should we process blocks of data returned by the storage in parallel
/// even when the storage returned only one stream of data for reading?
/// It is beneficial, for example, when you read from a file quickly,
@@ -444,7 +448,6 @@ class IStorage : public std::enable_shared_from_this, public TypePromo
/// useless).
virtual bool parallelizeOutputAfterReading(ContextPtr) const { return !isSystemStorage(); }
-public:
/// Other version of read which adds reading step to query plan.
/// Default implementation creates ReadFromStorageStep and uses usual read.
/// Can be called after `shutdown`, but not after `drop`.
diff --git a/src/Storages/IStorageCluster.cpp b/src/Storages/IStorageCluster.cpp
index c6c69c0f21bc..b98779260112 100644
--- a/src/Storages/IStorageCluster.cpp
+++ b/src/Storages/IStorageCluster.cpp
@@ -1,5 +1,8 @@
#include
+#include
+#include
+
#include
#include
#include
@@ -12,6 +15,7 @@
#include
#include
#include
+#include
#include
#include
#include
@@ -20,6 +24,10 @@
#include
#include
#include
+#include
+#include
+#include
+#include
#include
#include
@@ -34,13 +42,15 @@ namespace Setting
extern const SettingsBool async_query_sending_for_remote;
extern const SettingsBool async_socket_for_remote;
extern const SettingsBool skip_unavailable_shards;
- extern const SettingsBool parallel_replicas_local_plan;
- extern const SettingsString cluster_for_parallel_replicas;
extern const SettingsNonZeroUInt64 max_parallel_replicas;
+ extern const SettingsUInt64 object_storage_max_nodes;
+ extern const SettingsBool object_storage_remote_initiator;
+ extern const SettingsString object_storage_remote_initiator_cluster;
}
namespace ErrorCodes
{
+ extern const int NOT_IMPLEMENTED;
extern const int ALL_CONNECTION_TRIES_FAILED;
}
@@ -131,22 +141,29 @@ void IStorageCluster::read(
SelectQueryInfo & query_info,
ContextPtr context,
QueryProcessingStage::Enum processed_stage,
- size_t /*max_block_size*/,
- size_t /*num_streams*/)
+ size_t max_block_size,
+ size_t num_streams)
{
+ auto cluster_name_from_settings = getClusterName(context);
+
+ if (!isClusterSupported() || cluster_name_from_settings.empty())
+ {
+ readFallBackToPure(query_plan, column_names, storage_snapshot, query_info, context, processed_stage, max_block_size, num_streams);
+ return;
+ }
+
updateConfigurationIfNeeded(context);
storage_snapshot->check(column_names);
- updateBeforeRead(context);
- auto cluster = getCluster(context);
+ const auto & settings = context->getSettingsRef();
/// Calculate the header. This is significant, because some columns could be thrown away in some cases like query with count(*)
SharedHeader sample_block;
ASTPtr query_to_send = query_info.query;
- if (context->getSettingsRef()[Setting::allow_experimental_analyzer])
+ if (settings[Setting::allow_experimental_analyzer])
{
sample_block = InterpreterSelectQueryAnalyzer::getSampleBlock(query_info.query, context, SelectQueryOptions(processed_stage));
}
@@ -159,6 +176,31 @@ void IStorageCluster::read(
updateQueryToSendIfNeeded(query_to_send, storage_snapshot, context);
+ /// In case the current node is not supposed to initiate the clustered query
+ /// Sends this query to a remote initiator using the `remote` table function
+ if (settings[Setting::object_storage_remote_initiator])
+ {
+ /// Re-writes queries in the form of:
+ /// Input: SELECT * FROM iceberg(...) SETTINGS object_storage_cluster='swarm', object_storage_remote_initiator=1
+ /// Output: SELECT * FROM remote('remote_host', icebergCluster('swarm', ...)
+ /// Where `remote_host` is a random host from the cluster which will execute the query
+ /// This means the initiator node belongs to the same cluster that will execute the query
+ /// In case remote_initiator_cluster_name is set, the initiator might be set to a different cluster
+ auto remote_initiator_cluster_name = settings[Setting::object_storage_remote_initiator_cluster].value;
+ if (remote_initiator_cluster_name.empty())
+ remote_initiator_cluster_name = cluster_name_from_settings;
+ auto remote_initiator_cluster = getClusterImpl(context, remote_initiator_cluster_name);
+ auto storage_and_context = convertToRemote(remote_initiator_cluster, context, remote_initiator_cluster_name, query_to_send);
+ auto src_distributed = std::dynamic_pointer_cast(storage_and_context.storage);
+ auto modified_query_info = query_info;
+ modified_query_info.cluster = src_distributed->getCluster();
+ auto new_storage_snapshot = storage_and_context.storage->getStorageSnapshot(storage_snapshot->metadata, storage_and_context.context);
+ storage_and_context.storage->read(query_plan, column_names, new_storage_snapshot, modified_query_info, storage_and_context.context, processed_stage, max_block_size, num_streams);
+ return;
+ }
+
+ auto cluster = getClusterImpl(context, cluster_name_from_settings, isObjectStorage() ? settings[Setting::object_storage_max_nodes] : 0);
+
RestoreQualifiedNamesVisitor::Data data;
data.distributed_table = DatabaseAndTableWithAlias(*getTableExpression(query_info.query->as(), 0));
data.remote_table.database = context->getCurrentDatabase();
@@ -186,6 +228,95 @@ void IStorageCluster::read(
query_plan.addStep(std::move(reading));
}
+IStorageCluster::RemoteCallVariables IStorageCluster::convertToRemote(
+ ClusterPtr cluster,
+ ContextPtr context,
+ const std::string & cluster_name_from_settings,
+ ASTPtr query_to_send)
+{
+ /// TODO: Allow to use secret for remote queries
+ if (!cluster->getSecret().empty())
+ throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Can't convert query to remote when cluster uses secret");
+
+ auto host_addresses = cluster->getShardsAddresses();
+ if (host_addresses.empty())
+ throw Exception(ErrorCodes::LOGICAL_ERROR, "Empty cluster {}", cluster_name_from_settings);
+
+ pcg64 rng(randomSeed());
+ size_t shard_num = rng() % host_addresses.size();
+ auto shard_addresses = host_addresses[shard_num];
+ /// After getClusterImpl each shard must have exactly 1 replica
+ if (shard_addresses.size() != 1)
+ throw Exception(ErrorCodes::LOGICAL_ERROR, "Size of shard {} in cluster {} is not equal 1", shard_num, cluster_name_from_settings);
+ std::string host_name;
+ Poco::URI::decode(shard_addresses[0].toString(), host_name);
+
+ LOG_INFO(log, "Choose remote initiator '{}'", host_name);
+
+ bool secure = shard_addresses[0].secure == Protocol::Secure::Enable;
+ std::string remote_function_name = secure ? "remoteSecure" : "remote";
+
+ /// Clean object_storage_remote_initiator setting to avoid infinite remote call
+ auto new_context = Context::createCopy(context);
+ new_context->setSetting("object_storage_remote_initiator", false);
+ new_context->setSetting("object_storage_remote_initiator_cluster", String(""));
+
+ auto * select_query = query_to_send->as();
+ if (!select_query)
+ throw Exception(ErrorCodes::LOGICAL_ERROR, "Expected SELECT query");
+
+ auto query_settings = select_query->settings();
+ if (query_settings)
+ {
+ auto & settings_ast = query_settings->as();
+ if (settings_ast.changes.removeSetting("object_storage_remote_initiator") && settings_ast.changes.empty())
+ {
+ select_query->setExpression(ASTSelectQuery::Expression::SETTINGS, {});
+ }
+ }
+
+ ASTTableExpression * table_expression = extractTableExpressionASTPtrFromSelectQuery(query_to_send);
+ if (!table_expression)
+ throw Exception(ErrorCodes::LOGICAL_ERROR, "Can't find table expression");
+
+ boost::intrusive_ptr remote_query;
+
+ if (shard_addresses[0].user_specified)
+ { // with user/password for clsuter access remote query is executed from this user, add it in query parameters
+ remote_query = makeASTFunction(remote_function_name,
+ make_intrusive(host_name),
+ table_expression->table_function,
+ make_intrusive(shard_addresses[0].user),
+ make_intrusive(shard_addresses[0].password));
+ }
+ else
+ { // without specified user/password remote query is executed from default user
+ remote_query = makeASTFunction(remote_function_name, make_intrusive(host_name), table_expression->table_function);
+ }
+
+ table_expression->table_function = remote_query;
+
+ auto remote_function = TableFunctionFactory::instance().get(remote_query, new_context);
+
+ auto storage = remote_function->execute(query_to_send, new_context, remote_function_name);
+
+ return RemoteCallVariables{storage, new_context};
+}
+
+SinkToStoragePtr IStorageCluster::write(
+ const ASTPtr & query,
+ const StorageMetadataPtr & metadata_snapshot,
+ ContextPtr context,
+ bool async_insert)
+{
+ auto cluster_name_from_settings = getClusterName(context);
+
+ if (cluster_name_from_settings.empty())
+ return writeFallBackToPure(query, metadata_snapshot, context, async_insert);
+
+ throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Method write is not supported by storage {}", getName());
+}
+
void ReadFromCluster::initializePipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings &)
{
const Scalars & scalars = context->hasQueryContext() ? context->getQueryContext()->getScalars() : Scalars{};
@@ -278,9 +409,9 @@ ContextPtr ReadFromCluster::updateSettings(const Settings & settings)
return new_context;
}
-ClusterPtr IStorageCluster::getCluster(ContextPtr context) const
+ClusterPtr IStorageCluster::getClusterImpl(ContextPtr context, const String & cluster_name_, size_t max_hosts)
{
- return context->getCluster(cluster_name)->getClusterWithReplicasAsShards(context->getSettingsRef());
+ return context->getCluster(cluster_name_)->getClusterWithReplicasAsShards(context->getSettingsRef(), /* max_replicas_from_shard */ 0, max_hosts);
}
}
diff --git a/src/Storages/IStorageCluster.h b/src/Storages/IStorageCluster.h
index 3248b26b8c5e..ac266bf82da7 100644
--- a/src/Storages/IStorageCluster.h
+++ b/src/Storages/IStorageCluster.h
@@ -30,10 +30,16 @@ class IStorageCluster : public IStorage
SelectQueryInfo & query_info,
ContextPtr context,
QueryProcessingStage::Enum processed_stage,
- size_t /*max_block_size*/,
- size_t /*num_streams*/) override;
+ size_t max_block_size,
+ size_t num_streams) override;
- ClusterPtr getCluster(ContextPtr context) const;
+ SinkToStoragePtr write(
+ const ASTPtr & query,
+ const StorageMetadataPtr & metadata_snapshot,
+ ContextPtr context,
+ bool async_insert) override;
+
+ ClusterPtr getCluster(ContextPtr context) const { return getClusterImpl(context, cluster_name); }
/// Query is needed for pruning by virtual columns (_file, _path)
virtual RemoteQueryExecutor::Extension getTaskIteratorExtension(
@@ -51,13 +57,53 @@ class IStorageCluster : public IStorage
bool supportsOptimizationToSubcolumns() const override { return false; }
bool supportsTrivialCountOptimization(const StorageSnapshotPtr &, ContextPtr) const override { return true; }
+ const String & getOriginalClusterName() const { return cluster_name; }
+ virtual String getClusterName(ContextPtr /* context */) const { return getOriginalClusterName(); }
+
protected:
- virtual void updateBeforeRead(const ContextPtr &) {}
virtual void updateQueryToSendIfNeeded(ASTPtr & /*query*/, const StorageSnapshotPtr & /*storage_snapshot*/, const ContextPtr & /*context*/) {}
virtual void updateConfigurationIfNeeded(ContextPtr /* context */) {}
+ struct RemoteCallVariables
+ {
+ StoragePtr storage;
+ ContextPtr context;
+ };
+
+ RemoteCallVariables convertToRemote(
+ ClusterPtr cluster,
+ ContextPtr context,
+ const std::string & cluster_name_from_settings,
+ ASTPtr query_to_send);
+
+ virtual void readFallBackToPure(
+ QueryPlan & /* query_plan */,
+ const Names & /* column_names */,
+ const StorageSnapshotPtr & /* storage_snapshot */,
+ SelectQueryInfo & /* query_info */,
+ ContextPtr /* context */,
+ QueryProcessingStage::Enum /* processed_stage */,
+ size_t /* max_block_size */,
+ size_t /* num_streams */)
+ {
+ throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Method readFallBackToPure is not supported by storage {}", getName());
+ }
+
+ virtual SinkToStoragePtr writeFallBackToPure(
+ const ASTPtr & /*query*/,
+ const StorageMetadataPtr & /*metadata_snapshot*/,
+ ContextPtr /*context*/,
+ bool /*async_insert*/)
+ {
+ throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Method writeFallBackToPure is not supported by storage {}", getName());
+ }
+
private:
+ static ClusterPtr getClusterImpl(ContextPtr context, const String & cluster_name_, size_t max_hosts = 0);
+
+ virtual bool isClusterSupported() const { return true; }
+
LoggerPtr log;
String cluster_name;
};
diff --git a/src/Storages/ObjectStorage/Azure/Configuration.cpp b/src/Storages/ObjectStorage/Azure/Configuration.cpp
index f4abe3ac2c29..23aa8307bf7d 100644
--- a/src/Storages/ObjectStorage/Azure/Configuration.cpp
+++ b/src/Storages/ObjectStorage/Azure/Configuration.cpp
@@ -65,6 +65,7 @@ const std::unordered_set optional_configuration_keys = {
"partition_columns_in_data_file",
"client_id",
"tenant_id",
+ "storage_type",
};
void StorageAzureConfiguration::check(ContextPtr context)
@@ -208,10 +209,6 @@ void AzureStorageParsedArguments::fromNamedCollection(const NamedCollection & co
String connection_url;
String container_name;
- std::optional account_name;
- std::optional account_key;
- std::optional client_id;
- std::optional tenant_id;
if (collection.has("connection_string"))
connection_url = collection.get("connection_string");
@@ -392,16 +389,10 @@ void AzureStorageParsedArguments::fromAST(ASTs & engine_args, ContextPtr context
std::unordered_map engine_args_to_idx;
-
String connection_url = checkAndGetLiteralArgument(engine_args[0], "connection_string/storage_account_url");
String container_name = checkAndGetLiteralArgument(engine_args[1], "container");
blob_path = checkAndGetLiteralArgument(engine_args[2], "blobpath");
- std::optional account_name;
- std::optional account_key;
- std::optional client_id;
- std::optional tenant_id;
-
collectCredentials(extra_credentials, client_id, tenant_id, context);
auto is_format_arg = [] (const std::string & s) -> bool
@@ -451,8 +442,7 @@ void AzureStorageParsedArguments::fromAST(ASTs & engine_args, ContextPtr context
auto sixth_arg = checkAndGetLiteralArgument(engine_args[5], "partition_strategy/structure");
if (magic_enum::enum_contains(sixth_arg, magic_enum::case_insensitive))
{
- partition_strategy_type
- = magic_enum::enum_cast(sixth_arg, magic_enum::case_insensitive).value();
+ partition_strategy_type = magic_enum::enum_cast(sixth_arg, magic_enum::case_insensitive).value();
}
else
{
@@ -572,8 +562,7 @@ void AzureStorageParsedArguments::fromAST(ASTs & engine_args, ContextPtr context
auto eighth_arg = checkAndGetLiteralArgument(engine_args[7], "partition_strategy/structure");
if (magic_enum::enum_contains(eighth_arg, magic_enum::case_insensitive))
{
- partition_strategy_type
- = magic_enum::enum_cast(eighth_arg, magic_enum::case_insensitive).value();
+ partition_strategy_type = magic_enum::enum_cast(eighth_arg, magic_enum::case_insensitive).value();
}
else
{
@@ -825,6 +814,26 @@ void StorageAzureConfiguration::initializeFromParsedArguments(const AzureStorage
StorageObjectStorageConfiguration::initializeFromParsedArguments(parsed_arguments);
blob_path = parsed_arguments.blob_path;
connection_params = parsed_arguments.connection_params;
+ account_name = parsed_arguments.account_name;
+ account_key = parsed_arguments.account_key;
+ client_id = parsed_arguments.client_id;
+ tenant_id = parsed_arguments.tenant_id;
+}
+
+ASTPtr StorageAzureConfiguration::createArgsWithAccessData() const
+{
+ auto arguments = make_intrusive();
+
+ arguments->children.push_back(make_intrusive(connection_params.endpoint.storage_account_url));
+ arguments->children.push_back(make_intrusive(connection_params.endpoint.container_name));
+ arguments->children.push_back(make_intrusive(blob_path.path));
+ if (account_name && account_key)
+ {
+ arguments->children.push_back(make_intrusive(*account_name));
+ arguments->children.push_back(make_intrusive(*account_key));
+ }
+
+ return arguments;
}
void StorageAzureConfiguration::addStructureAndFormatToArgsIfNeeded(
@@ -832,13 +841,13 @@ void StorageAzureConfiguration::addStructureAndFormatToArgsIfNeeded(
{
if (disk)
{
- if (format == "auto")
+ if (getFormat() == "auto")
{
ASTs format_equal_func_args = {make_intrusive("format"), make_intrusive(format_)};
auto format_equal_func = makeASTFunction("equals", std::move(format_equal_func_args));
args.push_back(format_equal_func);
}
- if (structure == "auto")
+ if (getStructure() == "auto")
{
ASTs structure_equal_func_args = {make_intrusive("structure"), make_intrusive(structure_)};
auto structure_equal_func = makeASTFunction("equals", std::move(structure_equal_func_args));
diff --git a/src/Storages/ObjectStorage/Azure/Configuration.h b/src/Storages/ObjectStorage/Azure/Configuration.h
index 70e60e502799..41c9795b1a78 100644
--- a/src/Storages/ObjectStorage/Azure/Configuration.h
+++ b/src/Storages/ObjectStorage/Azure/Configuration.h
@@ -76,6 +76,11 @@ struct AzureStorageParsedArguments : private StorageParsedArguments
Path blob_path;
AzureBlobStorage::ConnectionParams connection_params;
+
+ std::optional account_name;
+ std::optional account_key;
+ std::optional client_id;
+ std::optional tenant_id;
};
class StorageAzureConfiguration : public StorageObjectStorageConfiguration
@@ -125,6 +130,7 @@ class StorageAzureConfiguration : public StorageObjectStorageConfiguration
onelake_client_secret = client_secret_;
onelake_tenant_id = tenant_id_;
}
+ ASTPtr createArgsWithAccessData() const override;
protected:
void fromDisk(const String & disk_name, ASTs & args, ContextPtr context, bool with_structure) override;
@@ -136,14 +142,21 @@ class StorageAzureConfiguration : public StorageObjectStorageConfiguration
Path blob_path;
Paths blobs_paths;
AzureBlobStorage::ConnectionParams connection_params;
- DiskPtr disk;
+
+ std::optional account_name;
+ std::optional account_key;
+ std::optional client_id;
+ std::optional tenant_id;
String onelake_client_id;
String onelake_client_secret;
String onelake_tenant_id;
+ DiskPtr disk;
+
void initializeFromParsedArguments(const AzureStorageParsedArguments & parsed_arguments);
};
+
}
#endif
diff --git a/src/Storages/ObjectStorage/DataLakes/Common/AvroForIcebergDeserializer.cpp b/src/Storages/ObjectStorage/DataLakes/Common/AvroForIcebergDeserializer.cpp
index 3c07bbb8ab4f..c27bb2ac7117 100644
--- a/src/Storages/ObjectStorage/DataLakes/Common/AvroForIcebergDeserializer.cpp
+++ b/src/Storages/ObjectStorage/DataLakes/Common/AvroForIcebergDeserializer.cpp
@@ -15,6 +15,7 @@
#include
#include
#include
+#include
namespace DB::ErrorCodes
{
@@ -22,6 +23,12 @@ namespace DB::ErrorCodes
extern const int INCORRECT_DATA;
}
+namespace ProfileEvents
+{
+ extern const Event IcebergAvroFileParsing;
+ extern const Event IcebergAvroFileParsingMicroseconds;
+}
+
namespace DB::Iceberg
{
@@ -33,6 +40,9 @@ try
: buffer(std::move(buffer_))
, manifest_file_path(manifest_file_path_)
{
+ ProfileEvents::increment(ProfileEvents::IcebergAvroFileParsing);
+ ProfileEventTimeIncrement watch(ProfileEvents::IcebergAvroFileParsingMicroseconds);
+
auto manifest_file_reader
= std::make_unique(std::make_unique(*buffer));
diff --git a/src/Storages/ObjectStorage/DataLakes/DataLakeConfiguration.h b/src/Storages/ObjectStorage/DataLakes/DataLakeConfiguration.h
index 5697b586a5ea..931bd1901d75 100644
--- a/src/Storages/ObjectStorage/DataLakes/DataLakeConfiguration.h
+++ b/src/Storages/ObjectStorage/DataLakes/DataLakeConfiguration.h
@@ -9,6 +9,7 @@
#include
#include
+#include
#include
#include
#include
@@ -19,11 +20,17 @@
#include
#include
#include
-#include
+#include
#include
#include
#include
#include
+#include
+#include
+#include
+#include
+#include
+
#include
#include
#include
@@ -55,30 +62,32 @@ namespace ErrorCodes
namespace DataLakeStorageSetting
{
- extern DataLakeStorageSettingsDatabaseDataLakeCatalogType storage_catalog_type;
- extern DataLakeStorageSettingsString object_storage_endpoint;
- extern DataLakeStorageSettingsString storage_aws_access_key_id;
- extern DataLakeStorageSettingsString storage_aws_secret_access_key;
- extern DataLakeStorageSettingsString storage_region;
- extern DataLakeStorageSettingsString storage_aws_role_arn;
- extern DataLakeStorageSettingsString storage_aws_role_session_name;
- extern DataLakeStorageSettingsString storage_catalog_url;
- extern DataLakeStorageSettingsString storage_warehouse;
- extern DataLakeStorageSettingsString storage_catalog_credential;
-
- extern DataLakeStorageSettingsString storage_auth_scope;
- extern DataLakeStorageSettingsString storage_auth_header;
- extern DataLakeStorageSettingsString storage_oauth_server_uri;
- extern DataLakeStorageSettingsBool storage_oauth_server_use_request_body;
+ extern const DataLakeStorageSettingsDatabaseDataLakeCatalogType storage_catalog_type;
+ extern const DataLakeStorageSettingsString object_storage_endpoint;
+ extern const DataLakeStorageSettingsString storage_aws_access_key_id;
+ extern const DataLakeStorageSettingsString storage_aws_secret_access_key;
+ extern const DataLakeStorageSettingsString storage_region;
+ extern const DataLakeStorageSettingsString storage_aws_role_arn;
+ extern const DataLakeStorageSettingsString storage_aws_role_session_name;
+ extern const DataLakeStorageSettingsString storage_catalog_url;
+ extern const DataLakeStorageSettingsString storage_warehouse;
+ extern const DataLakeStorageSettingsString storage_catalog_credential;
+ extern const DataLakeStorageSettingsString storage_auth_scope;
+ extern const DataLakeStorageSettingsString storage_auth_header;
+ extern const DataLakeStorageSettingsString storage_oauth_server_uri;
+ extern const DataLakeStorageSettingsBool storage_oauth_server_use_request_body;
+ extern const DataLakeStorageSettingsString iceberg_metadata_file_path;
}
template
concept StorageConfiguration = std::derived_from;
-template
+template
class DataLakeConfiguration : public BaseStorageConfiguration, public std::enable_shared_from_this
{
public:
+ DataLakeConfiguration() {}
+
explicit DataLakeConfiguration(DataLakeStorageSettingsPtr settings_) : settings(settings_) {}
bool isDataLakeConfiguration() const override { return true; }
@@ -92,6 +101,7 @@ class DataLakeConfiguration : public BaseStorageConfiguration, public std::enabl
auto result = BaseStorageConfiguration::getRawPath().path;
return StorageObjectStorageConfiguration::Path(result.ends_with('/') ? result : result + "/");
}
+ void setRawPath(const StorageObjectStorageConfiguration::Path & path) override { BaseStorageConfiguration::setRawPath(path); }
void update(ObjectStoragePtr object_storage, ContextPtr local_context) override
{
@@ -133,13 +143,13 @@ class DataLakeConfiguration : public BaseStorageConfiguration, public std::enabl
bool supportsDelete() const override
{
- assertInitialized();
+ assertInitializedDL();
return current_metadata->supportsDelete();
}
bool supportsParallelInsert() const override
{
- assertInitialized();
+ assertInitializedDL();
return current_metadata->supportsParallelInsert();
}
@@ -150,25 +160,25 @@ class DataLakeConfiguration : public BaseStorageConfiguration, public std::enabl
std::shared_ptr catalog,
const std::optional & format_settings) override
{
- assertInitialized();
+ assertInitializedDL();
current_metadata->mutate(commands, shared_from_this(), context, storage_id, metadata_snapshot, catalog, format_settings);
}
void checkMutationIsPossible(const MutationCommands & commands) override
{
- assertInitialized();
+ assertInitializedDL();
current_metadata->checkMutationIsPossible(commands);
}
void checkAlterIsPossible(const AlterCommands & commands) override
{
- assertInitialized();
+ assertInitializedDL();
current_metadata->checkAlterIsPossible(commands);
}
void alter(const AlterCommands & params, ContextPtr context) override
{
- assertInitialized();
+ assertInitializedDL();
current_metadata->alter(params, context);
}
@@ -182,7 +192,7 @@ class DataLakeConfiguration : public BaseStorageConfiguration, public std::enabl
std::optional tryGetTableStructureFromMetadata(ContextPtr local_context) const override
{
- assertInitialized();
+ assertInitializedDL();
if (auto schema = current_metadata->getTableSchema(local_context); !schema.empty())
return ColumnsDescription(std::move(schema));
return std::nullopt;
@@ -195,7 +205,7 @@ class DataLakeConfiguration : public BaseStorageConfiguration, public std::enabl
std::optional totalRows(ContextPtr local_context) override
{
- assertInitialized();
+ assertInitializedDL();
return current_metadata->totalRows(local_context);
}
@@ -206,38 +216,38 @@ class DataLakeConfiguration : public BaseStorageConfiguration, public std::enabl
std::optional totalBytes(ContextPtr local_context) override
{
- assertInitialized();
+ assertInitializedDL();
return current_metadata->totalBytes(local_context);
}
bool isDataSortedBySortingKey(StorageMetadataPtr metadata_snapshot, ContextPtr local_context) const override
{
- assertInitialized();
+ assertInitializedDL();
return current_metadata->isDataSortedBySortingKey(metadata_snapshot, local_context);
}
std::shared_ptr getInitialSchemaByPath(ContextPtr local_context, ObjectInfoPtr object_info) const override
{
- assertInitialized();
+ assertInitializedDL();
return current_metadata->getInitialSchemaByPath(local_context, object_info);
}
std::shared_ptr getSchemaTransformer(ContextPtr local_context, ObjectInfoPtr object_info) const override
{
- assertInitialized();
+ assertInitializedDL();
return current_metadata->getSchemaTransformer(local_context, object_info);
}
std::optional getTableStateSnapshot(ContextPtr context) const override
{
- assertInitialized();
+ assertInitializedDL();
return current_metadata->getTableStateSnapshot(context);
}
std::unique_ptr buildStorageMetadataFromState(
const DataLakeTableStateSnapshot & state, ContextPtr context) const override
{
- assertInitialized();
+ assertInitializedDL();
auto metadata = current_metadata->buildStorageMetadataFromState(state, context);
if (metadata)
LOG_TEST(log, "Built storage metadata from state with columns: {}",
@@ -247,13 +257,13 @@ class DataLakeConfiguration : public BaseStorageConfiguration, public std::enabl
bool shouldReloadSchemaForConsistency(ContextPtr context) const override
{
- assertInitialized();
+ assertInitializedDL();
return current_metadata->shouldReloadSchemaForConsistency(context);
}
IDataLakeMetadata * getExternalMetadata() override
{
- assertInitialized();
+ assertInitializedDL();
return current_metadata.get();
}
@@ -261,7 +271,7 @@ class DataLakeConfiguration : public BaseStorageConfiguration, public std::enabl
bool supportsWrites() const override
{
- assertInitialized();
+ assertInitializedDL();
return current_metadata->supportsWrites();
}
@@ -272,7 +282,7 @@ class DataLakeConfiguration : public BaseStorageConfiguration, public std::enabl
StorageMetadataPtr storage_metadata,
ContextPtr context) override
{
- assertInitialized();
+ assertInitializedDL();
return current_metadata->iterate(filter_dag, callback, list_batch_size, storage_metadata, context);
}
@@ -284,7 +294,7 @@ class DataLakeConfiguration : public BaseStorageConfiguration, public std::enabl
/// because the code will be removed ASAP anyway)
DeltaLakePartitionColumns getDeltaLakePartitionColumns() const
{
- assertInitialized();
+ assertInitializedDL();
const auto * delta_lake_metadata = dynamic_cast(current_metadata.get());
if (delta_lake_metadata)
return delta_lake_metadata->getPartitionColumns();
@@ -294,18 +304,18 @@ class DataLakeConfiguration : public BaseStorageConfiguration, public std::enabl
void modifyFormatSettings(FormatSettings & settings_, const Context & local_context) const override
{
- assertInitialized();
+ assertInitializedDL();
current_metadata->modifyFormatSettings(settings_, local_context);
}
ColumnMapperPtr getColumnMapperForObject(ObjectInfoPtr object_info) const override
{
- assertInitialized();
+ assertInitializedDL();
return current_metadata->getColumnMapperForObject(object_info);
}
ColumnMapperPtr getColumnMapperForCurrentSchema(StorageMetadataPtr storage_metadata_snapshot, ContextPtr context) const override
{
- assertInitialized();
+ assertInitializedDL();
return current_metadata->getColumnMapperForCurrentSchema(storage_metadata_snapshot, context);
}
@@ -376,7 +386,7 @@ class DataLakeConfiguration : public BaseStorageConfiguration, public std::enabl
bool optimize(const StorageMetadataPtr & metadata_snapshot, ContextPtr context, const std::optional & format_settings) override
{
- assertInitialized();
+ assertInitializedDL();
return current_metadata->optimize(metadata_snapshot, context, format_settings);
}
@@ -404,6 +414,44 @@ class DataLakeConfiguration : public BaseStorageConfiguration, public std::enabl
#endif
}
+ bool isClusterSupported() const override { return is_cluster_supported; }
+
+ ASTPtr createArgsWithAccessData() const override
+ {
+ auto res = BaseStorageConfiguration::createArgsWithAccessData();
+
+ auto iceberg_metadata_file_path = (*settings)[DataLakeStorageSetting::iceberg_metadata_file_path];
+
+ if (iceberg_metadata_file_path.changed)
+ {
+ auto * arguments = res->template as();
+ if (!arguments)
+ throw Exception(ErrorCodes::LOGICAL_ERROR, "Arguments are not an expression list");
+
+ bool has_settings = false;
+
+ for (auto & arg : arguments->children)
+ {
+ if (auto * settings_ast = arg->template as())
+ {
+ has_settings = true;
+ settings_ast->changes.setSetting("iceberg_metadata_file_path", iceberg_metadata_file_path.value);
+ break;
+ }
+ }
+
+ if (!has_settings)
+ {
+ boost::intrusive_ptr settings_ast = make_intrusive();
+ settings_ast->is_standalone = false;
+ settings_ast->changes.setSetting("iceberg_metadata_file_path", iceberg_metadata_file_path.value);
+ arguments->children.push_back(settings_ast);
+ }
+ }
+
+ return res;
+ }
+
private:
const DataLakeStorageSettingsPtr settings;
ObjectStoragePtr ready_object_storage;
@@ -421,8 +469,9 @@ class DataLakeConfiguration : public BaseStorageConfiguration, public std::enabl
}
}
- void assertInitialized() const
+ void assertInitializedDL() const
{
+ BaseStorageConfiguration::assertInitialized();
if (!current_metadata)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Metadata is not initialized");
}
@@ -456,18 +505,388 @@ using StorageS3IcebergConfiguration = DataLakeConfiguration;
#endif
-#if USE_AZURE_BLOB_STORAGE
+# if USE_AZURE_BLOB_STORAGE
using StorageAzureIcebergConfiguration = DataLakeConfiguration;
using StorageAzurePaimonConfiguration = DataLakeConfiguration;
#endif
-#if USE_HDFS
+# if USE_HDFS
using StorageHDFSIcebergConfiguration = DataLakeConfiguration;
using StorageHDFSPaimonConfiguration = DataLakeConfiguration;
#endif
using StorageLocalIcebergConfiguration = DataLakeConfiguration;
-using StorageLocalPaimonConfiguration = DataLakeConfiguration