Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions .github/workflows/bot.yml
Original file line number Diff line number Diff line change
Expand Up @@ -614,6 +614,9 @@ jobs:
- scalaProfile: "scala-2.13"
sparkProfile: "spark4.1"
sparkModules: "hudi-spark-datasource/hudi-spark4.1.x"
- scalaProfile: "scala-2.13"
sparkProfile: "spark4.2"
sparkModules: "hudi-spark-datasource/hudi-spark4.2.x"

steps:
- uses: actions/checkout@v5
Expand Down Expand Up @@ -664,6 +667,9 @@ jobs:
- scalaProfile: "scala-2.13"
sparkProfile: "spark4.1"
sparkModules: "hudi-spark-datasource/hudi-spark4.1.x"
- scalaProfile: "scala-2.13"
sparkProfile: "spark4.2"
sparkModules: "hudi-spark-datasource/hudi-spark4.2.x"

steps:
- uses: actions/checkout@v5
Expand Down Expand Up @@ -727,6 +733,9 @@ jobs:
- scalaProfile: "scala-2.13"
sparkProfile: "spark4.1"
sparkModules: "hudi-spark-datasource/hudi-spark4.1.x"
- scalaProfile: "scala-2.13"
sparkProfile: "spark4.2"
sparkModules: "hudi-spark-datasource/hudi-spark4.2.x"

steps:
- uses: actions/checkout@v5
Expand Down Expand Up @@ -784,6 +793,9 @@ jobs:
- scalaProfile: "scala-2.13"
sparkProfile: "spark4.1"
sparkModules: "hudi-spark-datasource/hudi-spark4.1.x"
- scalaProfile: "scala-2.13"
sparkProfile: "spark4.2"
sparkModules: "hudi-spark-datasource/hudi-spark4.2.x"

steps:
- uses: actions/checkout@v5
Expand Down Expand Up @@ -841,6 +853,9 @@ jobs:
- scalaProfile: "scala-2.13"
sparkProfile: "spark4.1"
sparkModules: "hudi-spark-datasource/hudi-spark4.1.x"
- scalaProfile: "scala-2.13"
sparkProfile: "spark4.2"
sparkModules: "hudi-spark-datasource/hudi-spark4.2.x"

steps:
- uses: actions/checkout@v5
Expand Down Expand Up @@ -1000,6 +1015,10 @@ jobs:
flinkProfile: 'flink1.20'
sparkProfile: 'spark4.1'
sparkRuntime: 'spark4.1.1'
- scalaProfile: 'scala-2.13'
flinkProfile: 'flink1.20'
sparkProfile: 'spark4.2'
sparkRuntime: 'spark4.2.0-preview4'

steps:
- uses: actions/checkout@v5
Expand Down Expand Up @@ -1112,6 +1131,12 @@ jobs:
flinkParquetVersion: '1.13.1'
sparkProfile: 'spark4.1'
sparkRuntime: 'spark4.1.1'
- scalaProfile: 'scala-2.13'
flinkProfile: 'flink1.20'
flinkAvroVersion: '1.11.4'
flinkParquetVersion: '1.13.1'
sparkProfile: 'spark4.2'
sparkRuntime: 'spark4.2.0-preview4'
steps:
- uses: actions/checkout@v5
- name: Set up JDK 17
Expand Down
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ Refer to the table below for building with different Spark and Scala versions.
| `-Dspark3.5 -Dscala-2.13` | hudi-spark3.5-bundle_2.13 | For Spark 3.5.x and Scala 2.13 |
| `-Dspark4.0` | hudi-spark4.0-bundle_2.13 | For Spark 4.0 and Scala 2.13 (Needs java 17) |
| `-Dspark4.1` | hudi-spark4.1-bundle_2.13 | For Spark 4.1 and Scala 2.13 (Needs java 17) |
| `-Dspark4.2` | hudi-spark4.2-bundle_2.13 | For Spark 4.2 and Scala 2.13 (Needs java 17) |
| `-Dspark3` | hudi-spark3-bundle_2.12 (legacy bundle name) | For Spark 3.5.x and Scala 2.12 |

Please note that only Spark-related bundles, i.e., `hudi-spark-bundle`, `hudi-utilities-bundle`,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,12 +60,14 @@ private[hudi] trait SparkVersionsSupport {
def isSpark3_5: Boolean = getSparkVersion.startsWith("3.5")
def isSpark4_0: Boolean = getSparkVersion.startsWith("4.0")
def isSpark4_1: Boolean = getSparkVersion.startsWith("4.1")
def isSpark4_2: Boolean = getSparkVersion.startsWith("4.2")

def gteqSpark3_3_2: Boolean = getSparkVersion >= "3.3.2"
def gteqSpark3_4: Boolean = getSparkVersion >= "3.4"
def gteqSpark3_5: Boolean = getSparkVersion >= "3.5"
def gteqSpark4_0: Boolean = getSparkVersion >= "4.0"
def gteqSpark4_1: Boolean = getSparkVersion >= "4.1"
def gteqSpark4_2: Boolean = getSparkVersion >= "4.2"
}

object HoodieSparkUtils extends SparkAdapterSupport with SparkVersionsSupport with Logging {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,9 @@ trait SparkAdapterSupport {
object SparkAdapterSupport {

lazy val sparkAdapter: SparkAdapter = {
val adapterClass = if (HoodieSparkUtils.isSpark4_1) {
val adapterClass = if (HoodieSparkUtils.isSpark4_2) {
"org.apache.spark.sql.adapter.Spark4_2Adapter"
} else if (HoodieSparkUtils.isSpark4_1) {
"org.apache.spark.sql.adapter.Spark4_1Adapter"
} else if (HoodieSparkUtils.isSpark4_0) {
"org.apache.spark.sql.adapter.Spark4_0Adapter"
Expand Down
4 changes: 2 additions & 2 deletions hudi-common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -258,9 +258,9 @@

<!-- LZ4 Hash Utils -->
<dependency>
<groupId>org.lz4</groupId>
<groupId>${lz4.groupId}</groupId>
<artifactId>lz4-java</artifactId>
<version>1.8.0</version>
<version>${lz4.version}</version>
</dependency>

<dependency>
Expand Down
2 changes: 2 additions & 0 deletions hudi-spark-datasource/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ The modules are organized in a layered architecture to maximize code reuse acros
| `hudi-spark3.5.x` | Spark 3.5.x-specific adapter implementation (default). |
| `hudi-spark4.0.x` | Spark 4.0.x-specific adapter implementation. |
| `hudi-spark4.1.x` | Spark 4.1.x-specific adapter implementation. |
| `hudi-spark4.2.x` | Spark 4.2.x-specific adapter implementation. |
| `hudi-spark` | Main Spark datasource module containing Spark Session extensions, stored procedures, SQL parser, and logical plans. |

## Spark Version Support
Expand All @@ -47,6 +48,7 @@ The modules are organized in a layered architecture to maximize code reuse acros
| 3.5.x (default) | `hudi-spark3.5.x` | 2.12, 2.13 | 11+ | `-Dspark3.5` |
| 4.0.x | `hudi-spark4.0.x` | 2.13 | 17+ | `-Dspark4.0` |
| 4.1.x | `hudi-spark4.1.x` | 2.13 | 17+ | `-Dspark4.1` |
| 4.2.x | `hudi-spark4.2.x` | 2.13 | 17+ | `-Dspark4.2` |

## Key Features

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ object SparkFilterHelper {
Types.FloatType.get()
case DoubleType =>
Types.DoubleType.get()
case StringType | CharType(_) | VarcharType(_) =>
case StringType | (_: CharType) | (_: VarcharType) =>
Types.StringType.get()
case DateType =>
Types.DateType.get()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,9 @@ object HoodieAnalysis extends SparkAdapterSupport {
val adaptIngestionTargetLogicalRelations: RuleBuilder = session => AdaptIngestionTargetLogicalRelations(session)

rules += adaptIngestionTargetLogicalRelations
val dataSourceV2ToV1FallbackClass = if (HoodieSparkUtils.isSpark4_1) {
val dataSourceV2ToV1FallbackClass = if (HoodieSparkUtils.isSpark4_2) {
"org.apache.spark.sql.hudi.analysis.HoodieSpark42DataSourceV2ToV1Fallback"
} else if (HoodieSparkUtils.isSpark4_1) {
"org.apache.spark.sql.hudi.analysis.HoodieSpark41DataSourceV2ToV1Fallback"
} else if (HoodieSparkUtils.isSpark4_0) {
"org.apache.spark.sql.hudi.analysis.HoodieSpark40DataSourceV2ToV1Fallback"
Expand All @@ -83,7 +85,10 @@ object HoodieAnalysis extends SparkAdapterSupport {
// leading to all relations resolving as V2 instead of current expectation of them being resolved as V1)
rules ++= Seq(dataSourceV2ToV1Fallback, resolveReferences)

if (HoodieSparkUtils.isSpark4_1) {
if (HoodieSparkUtils.isSpark4_2) {
rules += (_ => instantiateKlass(
"org.apache.spark.sql.hudi.analysis.HoodieSpark42ResolveColumnsForInsertInto"))
} else if (HoodieSparkUtils.isSpark4_1) {
rules += (_ => instantiateKlass(
"org.apache.spark.sql.hudi.analysis.HoodieSpark41ResolveColumnsForInsertInto"))
} else if (HoodieSparkUtils.isSpark4_0) {
Expand All @@ -95,7 +100,9 @@ object HoodieAnalysis extends SparkAdapterSupport {
}

val resolveAlterTableCommandsClass =
if (HoodieSparkUtils.isSpark4_1) {
if (HoodieSparkUtils.isSpark4_2) {
"org.apache.spark.sql.hudi.Spark42ResolveHudiAlterTableCommand"
} else if (HoodieSparkUtils.isSpark4_1) {
"org.apache.spark.sql.hudi.Spark41ResolveHudiAlterTableCommand"
} else if (HoodieSparkUtils.isSpark4_0) {
"org.apache.spark.sql.hudi.Spark40ResolveHudiAlterTableCommand"
Expand Down Expand Up @@ -144,7 +151,9 @@ object HoodieAnalysis extends SparkAdapterSupport {
)

val nestedSchemaPruningClass =
if (HoodieSparkUtils.isSpark4_1) {
if (HoodieSparkUtils.isSpark4_2) {
"org.apache.spark.sql.execution.datasources.Spark42NestedSchemaPruning"
} else if (HoodieSparkUtils.isSpark4_1) {
"org.apache.spark.sql.execution.datasources.Spark41NestedSchemaPruning"
} else if (HoodieSparkUtils.isSpark4_0) {
"org.apache.spark.sql.execution.datasources.Spark40NestedSchemaPruning"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1788,7 +1788,7 @@ class TestMergeIntoTable extends HoodieSparkSqlTestBase with ScalaAssertionSuppo
""".stripMargin)
spark.sql(s"insert into $sourceTable values(1, 'a1', 10, 1000)")
val nonExistentTable = "hudi_test_table"
val exception = intercept[org.apache.spark.sql.AnalysisException] {
val exception = intercept[Exception] {
spark.sql(
s"""
| MERGE INTO $nonExistentTable AS target
Expand All @@ -1800,8 +1800,9 @@ class TestMergeIntoTable extends HoodieSparkSqlTestBase with ScalaAssertionSuppo
| WHEN NOT MATCHED THEN INSERT *
""".stripMargin)
}
assert(exception.getMessage.contains("TABLE_OR_VIEW_NOT_FOUND") ||
exception.getMessage.contains("Table or view not found"),
val fullMsg = exception.getMessage + Option(exception.getCause).map(_.getMessage).getOrElse("")
assert(fullMsg.contains("TABLE_OR_VIEW_NOT_FOUND") ||
fullMsg.contains("Table or view not found"),
s"Expected TABLE_OR_VIEW_NOT_FOUND error but got: ${exception.getMessage}")
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,9 @@ class TestMergeIntoTable2 extends HoodieSparkSqlTestBase {
Seq(0)
)

val errorMsg = if (HoodieSparkUtils.gteqSpark4_0)
val errorMsg = if (HoodieSparkUtils.gteqSpark4_2)
"[INTERNAL_ERROR] Executed command failed. You hit a bug in Spark or the Spark plugins you use. Please, report this bug to the corresponding communities or vendors, and provide the full stack trace. SQLSTATE: XX000"
else if (HoodieSparkUtils.gteqSpark4_0)
"[INTERNAL_ERROR] Eagerly executed command failed. You hit a bug in Spark or the Spark plugins you use. Please, report this bug to the corresponding communities or vendors, and provide the full stack trace. SQLSTATE: XX000"
else
"assertion failed: Target table's field(price) cannot be the right-value of the update clause for MOR table."
Expand Down
Loading
Loading