Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,14 @@
package org.apache.ignite.internal;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;
import javax.annotation.processing.AbstractProcessor;
import javax.annotation.processing.RoundEnvironment;
import javax.annotation.processing.SupportedAnnotationTypes;
Expand Down Expand Up @@ -72,8 +75,12 @@ public class MessageProcessor extends AbstractProcessor {
/** Externalizable message. */
static final String MARSHALLABLE_MESSAGE_INTERFACE = "org.apache.ignite.plugin.extensions.communication.MarshallableMessage";

/** This is the only message with zero fields. A serializer must be generated due to restrictions in our communication process. */
static final String HANDSHAKE_WAIT_MESSAGE = "org.apache.ignite.spi.communication.tcp.messages.HandshakeWaitMessage";
/** Messages with no fields. A serializer must be generated due to restrictions in our communication process. */
static final String[] EMPTY_MESSAGES = {
"org.apache.ignite.spi.communication.tcp.messages.HandshakeWaitMessage",
"org.apache.ignite.spi.discovery.zk.internal.ZkNoServersMessage",
"org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2Null",
};

/** */
private final Map<String, IgniteBiTuple<String, String>> enumMappersInUse = new HashMap<>();
Expand All @@ -83,7 +90,11 @@ public class MessageProcessor extends AbstractProcessor {
*/
@Override public boolean process(Set<? extends TypeElement> annotations, RoundEnvironment roundEnv) {
TypeMirror msgType = processingEnv.getElementUtils().getTypeElement(MESSAGE_INTERFACE).asType();
TypeMirror handshakeWaitMsgType = processingEnv.getElementUtils().getTypeElement(HANDSHAKE_WAIT_MESSAGE).asType();
List<TypeMirror> emptyMsgs = Arrays.stream(EMPTY_MESSAGES)
.map(cls -> processingEnv.getElementUtils().getTypeElement(cls))
.filter(Objects::nonNull)
.map(Element::asType)
.collect(Collectors.toList());

Map<TypeElement, List<VariableElement>> msgFields = new HashMap<>();

Expand All @@ -101,7 +112,7 @@ public class MessageProcessor extends AbstractProcessor {

List<VariableElement> fields = orderedFields(clazz);

if (!fields.isEmpty() || processingEnv.getTypeUtils().isAssignable(clazz.asType(), handshakeWaitMsgType))
if (!fields.isEmpty() || emptyMsgs.stream().anyMatch(t -> processingEnv.getTypeUtils().isAssignable(clazz.asType(), t)))
msgFields.put(clazz, fields);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@
import org.apache.ignite.internal.managers.deployment.GridDeploymentManager;
import org.apache.ignite.internal.managers.discovery.DiscoveryLocalJoinData;
import org.apache.ignite.internal.managers.discovery.GridDiscoveryManager;
import org.apache.ignite.internal.managers.discovery.IgniteDiscoverySpi;
import org.apache.ignite.internal.managers.encryption.GridEncryptionManager;
import org.apache.ignite.internal.managers.eventstorage.GridEventStorageManager;
import org.apache.ignite.internal.managers.failover.GridFailoverManager;
Expand Down Expand Up @@ -214,6 +215,7 @@
import org.apache.ignite.plugin.extensions.communication.MessageFactoryProvider;
import org.apache.ignite.spi.IgniteSpi;
import org.apache.ignite.spi.IgniteSpiVersionCheckException;
import org.apache.ignite.spi.discovery.DiscoverySpi;
import org.apache.ignite.spi.discovery.isolated.IsolatedDiscoverySpi;
import org.apache.ignite.spi.discovery.tcp.internal.TcpDiscoveryNode;
import org.apache.ignite.spi.tracing.TracingConfigurationManager;
Expand Down Expand Up @@ -1315,9 +1317,6 @@ else if (e instanceof IgniteCheckedException)
private void initMessageFactory() throws IgniteCheckedException {
MessageFactoryProvider[] msgs = ctx.plugins().extensions(MessageFactoryProvider.class);

if (msgs == null)
msgs = new MessageFactoryProvider[0];

List<MessageFactoryProvider> compMsgs = new ArrayList<>();

compMsgs.add(new CoreMessagesProvider(ctx.marshaller(), ctx.marshallerContext().jdkMarshaller(),
Expand All @@ -1330,6 +1329,15 @@ private void initMessageFactory() throws IgniteCheckedException {
compMsgs.add(f);
}

DiscoverySpi discoSpi = ctx.config().getDiscoverySpi();

if (discoSpi instanceof IgniteDiscoverySpi) {
MessageFactoryProvider discoMsgs = ((IgniteDiscoverySpi)discoSpi).messageFactoryProvider();

if (discoMsgs != null)
compMsgs.add(discoMsgs);
}

if (!compMsgs.isEmpty())
msgs = F.concat(msgs, compMsgs.toArray(new MessageFactoryProvider[compMsgs.size()]));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import java.util.UUID;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.plugin.extensions.communication.MessageFactoryProvider;
import org.apache.ignite.spi.discovery.DiscoverySpi;

/**
Expand Down Expand Up @@ -57,4 +58,9 @@ public interface IgniteDiscoverySpi extends DiscoverySpi {
* @param err Connection error.
*/
public void resolveCommunicationFailure(ClusterNode node, Exception err);

/** @return Message factory provider. */
public default MessageFactoryProvider messageFactoryProvider() {
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package org.apache.ignite.internal.processors.query.h2.twostep.msg;

import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.Order;
import org.h2.value.Value;
import org.h2.value.ValueNull;

Expand All @@ -29,10 +28,6 @@ public class GridH2Null extends GridH2ValueMessage {
/** */
public static GridH2Null INSTANCE = new GridH2Null();

/** Dummy field to use codegen serializer. */
@Order(0)
byte dummy;

/**
* Disallow new instance creation.
*/
Expand All @@ -45,7 +40,6 @@ private GridH2Null() {
return ValueNull.INSTANCE;
}


/** {@inheritDoc} */
@Override public String toString() {
return "NULL";
Expand Down
6 changes: 6 additions & 0 deletions modules/zookeeper/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,12 @@
<artifactId>ignite-core</artifactId>
</dependency>

<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>ignite-codegen</artifactId>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteBiTuple;
import org.apache.ignite.lang.IgniteProductVersion;
import org.apache.ignite.plugin.extensions.communication.MessageFactoryProvider;
import org.apache.ignite.resources.LoggerResource;
import org.apache.ignite.spi.IgniteSpiAdapter;
import org.apache.ignite.spi.IgniteSpiConfiguration;
Expand All @@ -57,6 +58,7 @@
import org.apache.ignite.spi.discovery.DiscoverySpiNodeAuthenticator;
import org.apache.ignite.spi.discovery.DiscoverySpiOrderSupport;
import org.apache.ignite.spi.discovery.zk.internal.ZkIgnitePaths;
import org.apache.ignite.spi.discovery.zk.internal.ZkMessageFactory;
import org.apache.ignite.spi.discovery.zk.internal.ZookeeperClusterNode;
import org.apache.ignite.spi.discovery.zk.internal.ZookeeperDiscoveryImpl;
import org.apache.ignite.spi.discovery.zk.internal.ZookeeperDiscoveryStatistics;
Expand Down Expand Up @@ -554,6 +556,11 @@ private ZookeeperClusterNode initLocalNode() {
return locNode;
}

/** {@inheritDoc} */
@Override public MessageFactoryProvider messageFactoryProvider() {
return new ZkMessageFactory();
}

/** {@inheritDoc} */
@Override public String toString() {
return S.toString(ZookeeperDiscoverySpi.class, this);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,6 @@
import java.util.zip.InflaterInputStream;
import org.apache.ignite.internal.direct.DirectMessageReader;
import org.apache.ignite.internal.direct.DirectMessageWriter;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.marshaller.jdk.JdkMarshaller;
import org.apache.ignite.plugin.extensions.communication.Message;
import org.apache.ignite.plugin.extensions.communication.MessageFactory;
import org.apache.ignite.plugin.extensions.communication.MessageSerializer;
Expand All @@ -42,25 +40,14 @@
* Class is responsible for serializing discovery messages using RU-ready {@link MessageSerializer} mechanism.
*/
public class DiscoveryMessageParser {
/** Leading byte for messages use {@link JdkMarshaller} for serialization. */
// TODO: remove these flags after refactoring all discovery messages.
private static final byte JAVA_SERIALIZATION = (byte)1;

/** Leading byte for messages use {@link MessageSerializer} for serialization. */
private static final byte MESSAGE_SERIALIZATION = (byte)2;

/** Size for an intermediate buffer for serializing discovery messages. */
private static final int MSG_BUFFER_SIZE = 100;

/** */
private final JdkMarshaller jdkMarshaller;

/** */
private final MessageFactory msgFactory;

/** */
public DiscoveryMessageParser(JdkMarshaller jdkMarshaller, MessageFactory msgFactory) {
this.jdkMarshaller = jdkMarshaller;
public DiscoveryMessageParser(MessageFactory msgFactory) {
this.msgFactory = msgFactory;
}

Expand All @@ -69,16 +56,7 @@ public byte[] marshalZip(DiscoverySpiCustomMessage msg) {
ByteArrayOutputStream baos = new ByteArrayOutputStream();

try (DeflaterOutputStream out = new DeflaterOutputStream(baos)) {
if (msg instanceof Message) {
out.write(MESSAGE_SERIALIZATION);

serializeMessage((Message)msg, out);
}
else {
out.write(JAVA_SERIALIZATION);

U.marshal(jdkMarshaller, msg, out);
}
serializeMessage((Message)msg, out);
}
catch (Exception e) {
throw new IgniteSpiException("Failed to serialize message: " + msg, e);
Expand All @@ -93,14 +71,6 @@ public DiscoverySpiCustomMessage unmarshalZip(byte[] bytes) {
ByteArrayInputStream bais = new ByteArrayInputStream(bytes);
InflaterInputStream in = new InflaterInputStream(bais)
) {
byte mode = (byte)in.read();

if (mode == JAVA_SERIALIZATION)
return U.unmarshal(jdkMarshaller, in, U.gridClassLoader());

if (MESSAGE_SERIALIZATION != mode)
throw new IOException("Received unexpected byte while reading discovery message: " + mode);

return (DiscoverySpiCustomMessage)deserializeMessage(in);
}
catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,25 +18,35 @@
package org.apache.ignite.spi.discovery.zk.internal;

import java.util.UUID;
import org.apache.ignite.internal.Order;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.plugin.extensions.communication.Message;
import org.apache.ignite.plugin.extensions.communication.MessageFactory;
import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage;
import org.jetbrains.annotations.Nullable;

/**
*
*/
class ZkCommunicationErrorResolveFinishMessage implements DiscoverySpiCustomMessage, ZkInternalMessage {
class ZkCommunicationErrorResolveFinishMessage implements DiscoverySpiCustomMessage, ZkInternalMessage, Message {
/** */
private static final long serialVersionUID = 0L;

/** */
final UUID futId;
@Order(0)
UUID futId;

/** */
final long topVer;
@Order(1)
long topVer;

/** */
transient ZkCommunicationErrorResolveResult res;
ZkCommunicationErrorResolveResult res;

/** Constructor for {@link MessageFactory}. */
public ZkCommunicationErrorResolveFinishMessage() {
// No-op.
}

/**
* @param futId Future ID.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,28 @@
package org.apache.ignite.spi.discovery.zk.internal;

import java.util.UUID;
import org.apache.ignite.internal.Order;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.plugin.extensions.communication.Message;
import org.apache.ignite.plugin.extensions.communication.MessageFactory;
import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage;
import org.jetbrains.annotations.Nullable;

/**
* Zk Communication Error Resolve Start Message.
*/
public class ZkCommunicationErrorResolveStartMessage implements DiscoverySpiCustomMessage, ZkInternalMessage {
public class ZkCommunicationErrorResolveStartMessage implements DiscoverySpiCustomMessage, ZkInternalMessage, Message {
/** */
private static final long serialVersionUID = 0L;

/** */
final UUID id;
@Order(0)
UUID id;

/** Constructor for {@link MessageFactory}. */
public ZkCommunicationErrorResolveStartMessage() {
// No-op.
}

/**
* @param id Unique ID.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,22 +17,32 @@

package org.apache.ignite.spi.discovery.zk.internal;

import org.apache.ignite.internal.Order;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.plugin.extensions.communication.Message;
import org.apache.ignite.plugin.extensions.communication.MessageFactory;
import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage;
import org.jetbrains.annotations.Nullable;

/**
* Zk Force Node Fail Message.
*/
public class ZkForceNodeFailMessage implements DiscoverySpiCustomMessage, ZkInternalMessage {
public class ZkForceNodeFailMessage implements DiscoverySpiCustomMessage, ZkInternalMessage, Message {
/** */
private static final long serialVersionUID = 0L;

/** */
final long nodeInternalId;
@Order(0)
long nodeInternalId;

/** */
final String warning;
@Order(1)
String warning;

/** Constructor for {@link MessageFactory}. */
public ZkForceNodeFailMessage() {
// No-op.
}

/**
* @param nodeInternalId Node ID.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,12 @@

package org.apache.ignite.spi.discovery.zk.internal;

import java.io.Serializable;

/**
*
*/
class ZkInternalJoinErrorMessage implements ZkInternalMessage {
class ZkInternalJoinErrorMessage implements ZkInternalMessage, Serializable {
/** */
private static final long serialVersionUID = 0L;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,9 @@

package org.apache.ignite.spi.discovery.zk.internal;

import java.io.Serializable;

/**
*
*/
interface ZkInternalMessage extends Serializable {
interface ZkInternalMessage {
// No-op.
}
Loading
Loading