Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/build_and_test_cpp.yml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ on:
- 'website/**'
- '**/*.md'
- 'bindings/python/**'
- 'bindings/elixir/**'
workflow_dispatch:

concurrency:
Expand Down
103 changes: 103 additions & 0 deletions .github/workflows/build_and_test_elixir.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

name: Elixir Build and Tests

on:
push:
branches:
- main
paths-ignore:
- 'website/**'
- '**/*.md'
pull_request:
branches:
- main
paths-ignore:
- 'website/**'
- '**/*.md'
- 'bindings/cpp/**'
- 'bindings/python/**'
workflow_dispatch:

concurrency:
group: ${{ github.workflow }}-${{ github.ref }}-${{ github.event_name }}
cancel-in-progress: true

jobs:
build-and-test:
timeout-minutes: 60
runs-on: ubuntu-latest
env:
OTP_VERSION: "28.0.2"
ELIXIR_VERSION: "1.19.5"
FLUSS_TEST_CLUSTER_BIN: ${{ github.workspace }}/target/debug/fluss-test-cluster
MIX_ENV: test
steps:
- uses: actions/checkout@v6

- name: Set up BEAM
uses: erlef/setup-beam@fc68ffb90438ef2936bbb3251622353b3dcb2f93 # v1.24.0
with:
otp-version: ${{ env.OTP_VERSION }}
elixir-version: ${{ env.ELIXIR_VERSION }}

- name: Install protoc
run: sudo apt-get update && sudo apt-get install -y protobuf-compiler

- name: Rust Cache
uses: Swatinem/rust-cache@c19371144df3bb44fab255c43d04cbc2ab54d1c4 # v2.9.1

- name: Cache Mix deps and build
uses: actions/cache@v4
with:
path: |
bindings/elixir/deps
bindings/elixir/_build
key: ${{ runner.os }}-mix-otp${{ env.OTP_VERSION }}-elixir${{ env.ELIXIR_VERSION }}-${{ hashFiles('bindings/elixir/mix.lock') }}
restore-keys: |
${{ runner.os }}-mix-otp${{ env.OTP_VERSION }}-elixir${{ env.ELIXIR_VERSION }}-

- name: Build fluss-test-cluster binary
run: cargo build -p fluss-test-cluster

- name: Fetch Elixir deps
working-directory: bindings/elixir
run: mix deps.get

- name: Check formatting
working-directory: bindings/elixir
run: mix format --check-formatted

- name: Compile (warnings as errors)
working-directory: bindings/elixir
run: mix compile --warnings-as-errors

- name: Credo
working-directory: bindings/elixir
run: mix credo

- name: Run unit tests
working-directory: bindings/elixir
run: mix test

- name: Run integration tests
working-directory: bindings/elixir
run: mix test --include integration --only integration
env:
RUST_LOG: DEBUG
RUST_BACKTRACE: full
1 change: 1 addition & 0 deletions .github/workflows/build_and_test_python.yml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ on:
- 'website/**'
- '**/*.md'
- 'bindings/cpp/**'
- 'bindings/elixir/**'
workflow_dispatch:

concurrency:
Expand Down
1 change: 1 addition & 0 deletions .github/workflows/build_and_test_rust.yml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ on:
- '**/*.md'
- 'bindings/python/**'
- 'bindings/cpp/**'
- 'bindings/elixir/**'
workflow_dispatch:

concurrency:
Expand Down
229 changes: 74 additions & 155 deletions bindings/elixir/test/support/cluster.ex
Original file line number Diff line number Diff line change
Expand Up @@ -18,21 +18,11 @@
defmodule Fluss.Test.Cluster do
@moduledoc false

@fluss_image "apache/fluss"
@fluss_version "0.9.0-incubating"
# Shells out to the `fluss-test-cluster` CLI (from `crates/fluss-test-cluster`),
# the same binary used by the Python and C++ integration tests.

@network_name "fluss-elixir-test-network"
@zookeeper_name "zookeeper-elixir-test"
@coordinator_name "coordinator-server-elixir-test"
@tablet_server_name "tablet-server-elixir-test"

# Same fixed ports used by Python/C++ integration tests.
@coordinator_sasl_port 9123
@coordinator_plain_port 9223
@tablet_sasl_port 9124
@tablet_plain_port 9224

def bootstrap_servers, do: "127.0.0.1:#{@coordinator_plain_port}"
@cluster_name "shared-test"
@cluster_json_prefix "CLUSTER_JSON: "

def ensure_started do
case System.get_env("FLUSS_BOOTSTRAP_SERVERS") do
Expand All @@ -42,170 +32,99 @@ defmodule Fluss.Test.Cluster do
end

def stop do
for name <- [@tablet_server_name, @coordinator_name, @zookeeper_name] do
System.cmd("docker", ["rm", "-f", name], stderr_to_stdout: true)
end
if System.get_env("FLUSS_BOOTSTRAP_SERVERS") do
:ok
else
case find_cli_binary() do
{:ok, cli} ->
System.cmd(cli, ["stop", "--name", @cluster_name], stderr_to_stdout: true)
:ok

System.cmd("docker", ["network", "rm", @network_name], stderr_to_stdout: true)
:ok
{:error, _} ->
:ok
end
end
end

defp start_cluster do
if port_open?(@coordinator_plain_port) do
IO.puts("Reusing existing Fluss cluster on port #{@coordinator_plain_port}")
{:ok, bootstrap_servers()}
with {:ok, cli} <- find_cli_binary(),
{output, 0} <-
System.cmd(cli, ["start", "--sasl", "--name", @cluster_name], stderr_to_stdout: true),
{:ok, bootstrap} <- parse_cluster_json(output) do
{:ok, bootstrap}
else
do_start_cluster()
{output, code} when is_binary(output) ->
{:error, "fluss-test-cluster start failed (exit #{code}):\n#{output}"}

{:error, _} = err ->
err
end
end

defp do_start_cluster do
IO.puts("Starting Fluss cluster via Docker...")
defp find_cli_binary do
case System.get_env("FLUSS_TEST_CLUSTER_BIN") do
bin when is_binary(bin) and bin != "" ->
if File.regular?(bin),
do: {:ok, bin},
else: {:error, "FLUSS_TEST_CLUSTER_BIN=#{bin} does not exist"}

# Remove any leftover containers from previous runs
for name <- [@tablet_server_name, @coordinator_name, @zookeeper_name] do
System.cmd("docker", ["rm", "-f", name], stderr_to_stdout: true)
end

System.cmd("docker", ["network", "create", @network_name], stderr_to_stdout: true)

sasl_jaas =
~s(org.apache.fluss.security.auth.sasl.plain.PlainLoginModule required user_admin="admin-secret" user_alice="alice-secret";)

coordinator_props =
Enum.join(
[
"zookeeper.address: #{@zookeeper_name}:2181",
"bind.listeners: INTERNAL://#{@coordinator_name}:0, CLIENT://#{@coordinator_name}:9123, PLAIN_CLIENT://#{@coordinator_name}:9223",
"advertised.listeners: CLIENT://localhost:#{@coordinator_sasl_port}, PLAIN_CLIENT://localhost:#{@coordinator_plain_port}",
"internal.listener.name: INTERNAL",
"security.protocol.map: CLIENT:sasl",
"security.sasl.enabled.mechanisms: plain",
"security.sasl.plain.jaas.config: #{sasl_jaas}",
"netty.server.num-network-threads: 1",
"netty.server.num-worker-threads: 3"
],
"\n"
)

tablet_props =
Enum.join(
[
"zookeeper.address: #{@zookeeper_name}:2181",
"bind.listeners: INTERNAL://#{@tablet_server_name}:0, CLIENT://#{@tablet_server_name}:9123, PLAIN_CLIENT://#{@tablet_server_name}:9223",
"advertised.listeners: CLIENT://localhost:#{@tablet_sasl_port}, PLAIN_CLIENT://localhost:#{@tablet_plain_port}",
"internal.listener.name: INTERNAL",
"security.protocol.map: CLIENT:sasl",
"security.sasl.enabled.mechanisms: plain",
"security.sasl.plain.jaas.config: #{sasl_jaas}",
"tablet-server.id: 0",
"netty.server.num-network-threads: 1",
"netty.server.num-worker-threads: 3"
],
"\n"
)

docker_run([
"--name",
@zookeeper_name,
"--network",
@network_name,
"-d",
"zookeeper:3.9.2"
])

docker_run([
"--name",
@coordinator_name,
"--network",
@network_name,
"-p",
"#{@coordinator_sasl_port}:9123",
"-p",
"#{@coordinator_plain_port}:9223",
"-e",
"FLUSS_PROPERTIES=#{coordinator_props}",
"-d",
"#{@fluss_image}:#{@fluss_version}",
"coordinatorServer"
])

docker_run([
"--name",
@tablet_server_name,
"--network",
@network_name,
"-p",
"#{@tablet_sasl_port}:9123",
"-p",
"#{@tablet_plain_port}:9223",
"-e",
"FLUSS_PROPERTIES=#{tablet_props}",
"-d",
"#{@fluss_image}:#{@fluss_version}",
"tabletServer"
])

all_ports = [@coordinator_plain_port, @tablet_plain_port]

if wait_for_ports(all_ports, 90) do
IO.puts("Fluss cluster started successfully.")
{:ok, bootstrap_servers()}
else
{:error, "Cluster ports did not become ready within timeout"}
_ ->
locate_via_cargo()
end
end

defp docker_run(args) do
{output, code} = System.cmd("docker", ["run" | args], stderr_to_stdout: true)
defp locate_via_cargo do
case System.cmd("cargo", ["locate-project", "--workspace", "--message-format", "plain"],
stderr_to_stdout: true
) do
{output, 0} ->
output |> String.trim() |> Path.dirname() |> find_binary_in_target()

if code != 0 do
IO.puts("Docker run warning (code #{code}): #{output}")
{output, code} ->
{:error, "cargo locate-project failed (exit #{code}): #{output}"}
end
end

defp wait_for_ports(ports, timeout_s) do
deadline = System.monotonic_time(:second) + timeout_s
defp find_binary_in_target(root) do
Enum.find_value(
["debug", "release"],
{:error, "fluss-test-cluster binary not found. Run: cargo build -p fluss-test-cluster"},
&check_binary(root, &1)
)
end

Enum.all?(ports, fn port ->
remaining = deadline - System.monotonic_time(:second)
remaining > 0 and wait_for_port(port, remaining)
end)
defp check_binary(root, profile) do
path = Path.join([root, "target", profile, "fluss-test-cluster"])
if File.regular?(path), do: {:ok, path}, else: nil
end

defp wait_for_port(port, timeout_s) do
deadline = System.monotonic_time(:second) + timeout_s
defp parse_cluster_json(output) do
output
|> String.split("\n", trim: true)
|> Enum.find_value(
{:error, "No #{@cluster_json_prefix} token in output:\n#{output}"},
&extract_bootstrap/1
)
end

Stream.repeatedly(fn ->
case :gen_tcp.connect(~c"localhost", port, [], 1000) do
{:ok, socket} ->
:gen_tcp.close(socket)
:ok
defp extract_bootstrap(line) do
case String.split(line, @cluster_json_prefix, parts: 2) do
[_, json] ->
case decode_bootstrap(json) do
{:ok, bootstrap} -> {:ok, bootstrap}
_ -> nil
end

{:error, _} ->
Process.sleep(1000)
:retry
end
end)
|> Enum.reduce_while(false, fn
:ok, _acc ->
{:halt, true}

:retry, _acc ->
if System.monotonic_time(:second) >= deadline,
do: {:halt, false},
else: {:cont, false}
end)
_ ->
nil
end
end

defp port_open?(port) do
case :gen_tcp.connect(~c"localhost", port, [], 1000) do
{:ok, socket} ->
:gen_tcp.close(socket)
true

{:error, _} ->
false
# Minimal JSON extractor for `bootstrap_servers`: avoids adding a JSON dep just for tests.
defp decode_bootstrap(json) do
case Regex.run(~r/"bootstrap_servers"\s*:\s*"([^"]+)"/, json) do
[_, servers] -> {:ok, servers}
_ -> {:error, "no bootstrap_servers in: #{json}"}
end
end
end
Loading