How does jgroups work




















It consists of the address of the view creator and a sequence number. ViewIds can be compared for equality and put in a hashmaps as they implement equals and hashCode. Whenever a group splits into subgroups, e. The MergeView is a subclass of View and contains as additional instance variable the list of views that were merged. As an example if the group denoted by view V1: p,q,r,s,t split into subgroups V2: p,q,r and V2: s,t , the merged view might be V3: p,q,r,s,t.

In this case the MergeView would contains a list of 2 views: V2: p,q,r and V2: s,t. In order to join a group and send messages, a process has to create a channel. A channel is like a socket. When a client connects to a channel, it gives the the name of the group it would like to join. Thus, a channel is in its connected state always associated with a particular group. The protocol stack takes care that channels with the same group name find each other: whenever a client connects to a channel given group name G, then it tries to find existing channels with the same name, and joins them, resulting in a new view being installed which contains the new member.

If no members exist, a new group will be created. A state transition diagram for the major states a channel can assume are shown in [ChannelStatesFig]. An attempt to perform certain operations which are only valid in the connected state e. After a successful connection by a client, it moves to the connected state. Now the channel will receive messages from other members and may send messages to other members or to the group, and it will get notified when new members join or leave.

Getting the local address of a channel is guaranteed to be a valid operation in this state see below. When the channel is disconnected, it moves back to the unconnected state.

Both a connected and unconnected channel may be closed, which makes the channel unusable for further operations.

Any attempt to do so will result in an exception. When a channel is closed directly from a connected state, it will first be disconnected, and then closed. The props argument points to an XML file containing the configuration of the protocol stack to be used. If the props argument is null, the default properties will be used.

An exception will be thrown if the channel cannot be created. Possible causes include protocols that were specified in the property argument, but were not found, or wrong parameters to protocols. In the latter case, an application downloads its protocol stack specification from a server, which allows for central administration of application properties.

Each element defines one protocol. Each protocol is implemented as a Java class. When a protocol stack is created based on the above XML configuration, the first element "UDP" becomes the bottom-most layer, the second one will be placed on the first, etc: the stack is created from the bottom to the top.

Each element has to be the name of a Java class that resides in the org. Note that only the base name has to be given, not the fully specified class name UDP instead of org. If the protocol class is not found, JGroups assumes that the name given is a fully qualified classname and will therefore try to instantiate that class. If this does not work an exception is thrown.

This allows for protocol classes to reside in different packages altogether, e. Usually, channels are created by passing the name of an XML configuration file to the JChannel constructor. On top of this declarative configuration, JGroups provides an API to create a channel programmatically.

The way to do this is to first create a JChannel, then an instance of ProtocolStack, then add all desired protocols to the stack and finally calling init on the stack to set it up.

The rest, e. An example of how to programmatically create a channel is shown below copied from ProgrammaticChat :. First a JChannel is created 1. The false argument tells the channel not to create a ProtocolStack. This is needed because we will create one ourselves later and set it in the channel 2. Next, all protocols are added to the stack 3. Note that the order is from bottom transport protocol to top.

Once the stack is configured, we call ProtocolStack. After this, the channel is ready to be used and all subsequent actions e. A logical name might show the function of a channel, e. If no logical name is set, JGroups generates one, using the hostname and a random number, e.

This must be done before connecting a channel. Note that the logical name stays with a channel until the channel is destroyed, whereas a UUID is created on each connection. The logical name is mac and the physical address is The UUID is not shown here. Since 2. This means that an application can determine what kind of addresses it uses. This can be used to for example pass additional data around with an address, for example information about the location of the node to which the address is assigned.

To use custom addresses, an implementation of org. AddressGenerator has to be written. For any class CustomAddress, it will need to get registered with the ClassConfigurator in order to marshal it correctly:. This has to be done before the channel is connected. An example of a subclass is org. When a client wants to join a cluster, it connects to a channel giving the name of the cluster to be joined:. The cluster name is the name of the cluster to be joined.

All channels that call connect with the same name form a cluster. Messages sent on any channel in the cluster will be received by all members including the one who sent it. The connect method returns as soon as the cluster has been joined successfully. If the channel is in the closed state see channel states , an exception will be thrown. If there are no other members, i.

The first member of a cluster becomes its coordinator. A coordinator is in charge of installing new views whenever the membership changes. Clients can also join a cluster and fetch cluster state in one operation.

The best way to conceptualize the connect and fetch state connect method is to think of it as an invocation of the regular connect and getState methods executed in succession. However, there are several advantages of using the connect and fetch state connect method over the regular connect. First of all, the underlying message exchange is heavily optimized, especially if the flush protocol is used. Just as in a regular connect , the cluster name represents a cluster to be joined.

The target parameter indicates a cluster member to fetch the state from. A null target indicates that the state should be fetched from the cluster coordinator. If the state should be fetched from a particular member other than the coordinator, clients can simply provide the address of that member. The timeout paremeter bounds the entire join and fetch operation.

An exception will be thrown if the timeout is exceeded. Method getAddress returns the address of the channel.

The address may or may not be available when a channel is in the unconnected state. This method returns the current view of the channel. It is updated every time a new view is installed viewAccepted callback. Calling this method on an unconnected or closed channel is implementation defined. A channel may return null, or it may return the last view it knew of. Once the channel is connected, messages can be sent using one of the send methods:.

The first send method has only one argument, which is the message to be sent. When the destination is null, the message will be sent to all members of the cluster including itself. The remainaing send methods are helper methods; they take either a byte[] buffer or a serializable, create a Message and call send Message. If the channel is not connected, or was closed, an exception will be thrown upon attempting to send a message.

The null value as destination address means that the message will be sent to all members in the cluster. The sample code determines the coordinator first member of the view and sends it a "hello world" message. To do this, JChannel. Note that this method replaces the old JChannel. While JGroups guarantees that a message will eventually be delivered at all non-faulty members, sometimes this might take a while.

For example, if we have a retransmission protocol based on negative acknowledgments, and the last message sent is lost, then the receiver s will have to wait until the stability protocol notices that the message has been lost, before it can be retransmitted. This can be changed by setting the Message. RSVP flag in a message: when this flag is encountered, the message send blocks until all members have acknowledged reception of the message of course excluding members which crashed or left meanwhile.

For example, when completing a unit of work ie. Note that RSVP also works for sending a message to a unicast destination. Method send will return as soon as it has received acks from all current members. The application can choose to catch this runtime exception and do something with it, e. As an example, RpcDispatcher. If the call options contain flag RSVP , then the future would only be returned once all responses have been received. This is clearly undesirable behavior.

Method receive in ReceiverAdapter or Receiver can be overridden to receive messages, views, and state transfer callbacks. A Receiver can be registered with a channel using JChannel. All received messages, view changes and state transfer requests will invoke callbacks on the registered Receiver:. As shown above, the viewAccepted callback of ReceiverAdapter can be used to get callbacks whenever a cluster membership change occurs. The receiver needs to be set via JChannel.

As discussed in ReceiverAdapter , code in callbacks must avoid anything that takes a lot of time, or blocks; JGroups invokes this callback as part of the view installation, and if this user code blocks, the view installation would block, too.

A newly joined member may want to retrieve the state of the cluster before starting work. This is done with getState :. This method returns the state of one member usually of the oldest member, the coordinator. The target parameter can usually be null, to ask the current coordinator for the state. If a timeout ms elapses before the state is fetched, an exception will be thrown. A timeout of 0 waits until the entire state has been transferred.

To participate in state transfer, both state provider and state requester have to implement the following callbacks from ReceiverAdapter Receiver :. Method getState is invoked on the state provider usually the coordinator. It needs to write its state to the output stream given. The setState method is invoked on the state requester ; this is the member which called JChannel.

It needs to read its state from the input stream and set its internal state to it. A writes its state to the output stream passed as a parameter to getState. D reads the state from the input stream and sets its internal state to it, overriding any previous data. D: JChannel.

Note that this will only happen after the state has been transferred successfully, or a timeout elapsed, or either the state provider or requester throws an exception. Such an exception will be re-thrown by getState.

The getState implementation synchronized on the state so no incoming messages can modify it during the state transfer , and uses the JGroups utility method objectToStream. If a lot of smaller fragments are written to an output stream, it is best to wrap the output stream into a BufferedOutputStream, e. The setState implementation also uses the Util. In order to use state transfer, a state transfer protocol has to be included in the configuration. More details on the protocols can be found in the protocols list section.

This is the original state transfer protocol, which used to transfer byte[] buffers. It still does that, but is internally converted to call the getState and setState callbacks which use input and output streams. Note that, because byte[] buffers are converted into input and output streams, this protocol should not be used for transfer of large states.

It sends the entire state across from the provider to the requester in configurable chunks, so that memory consumption is minimal.

It will have no effect if the channel is already in the disconnected or closed state. If connected, it will leave the cluster. This is done transparently for a channel user by sending a leave request to the current coordinator. The latter will subsequently remove the leaving node from the view and install a new view in all remaining members. After a successful disconnect, the channel will be in the unconnected state, and may subsequently be reconnected. To destroy a channel instance destroy the associated protocol stack, and release all resources , method close is used:.

The close method moves the channel to the closed state, in which no further operations are allowed most throw an exception when invoked on a closed channel. Building blocks are layered on top of channels, and can be used instead of channels whenever a higher-level interface is required.

Whereas channels are simple socket-like constructs, building blocks may offer a far more sophisticated interface. Building blocks are located in the org. Channels are simple patterns to asynchronously send and receive messages. However, a significant number of communication patterns in group communication require synchronous communication. For example, a sender would like to send a message to the group and wait for all responses.

Or another application would like to send a message to the group and wait only until the majority of the receivers have sent a response, or until a timeout occurred. MessageDispatcher provides blocking and non-blocking request sending and response correlation. It offers synchronous as well as asynchronous message sending with request-response correlation, e. An example of using this class would be to send a request message to all cluster members, and block until all responses have been received, or until a timeout has elapsed.

Contrary to RpcDispatcher , MessageDispatcher deals with sending message requests and correlating message responses , while RpcDispatcher deals with invoking method calls and correlating responses. RpcDispatcher extends MessageDispatcher, and offers an even higher level of abstraction over MessageDispatcher. Both MessageDispatcher and RpcDispatcher sit on top of a channel; therefore an instance of MessageDispatcher is created with a channel as argument. It can now be used in both client and server role : a client sends requests and receives responses and a server receives requests and sends responses.

MessageDispatcher allows for an application to be both at the same time. To be able to serve requests in the server role, the RequestHandler. The handle method is called whenever a request is received.

It must return a value must be serializable, but can be null or throw an exception. The returned value will be sent to the sender, and exceptions are also propagated to the sender. Every message sending in MessageDispatcher or request invocation in RpcDispatcher is governed by an instance of RequestOptions.

This is a class which can be passed to a call to define the various options related to the call, e. Response mode: this determines whether the call is blocking and - if yes - how long it should block.

The modes are:. A timeout of 0 means to wait forever. This will send the request to C and D only, as unicasts, which is better if we use a transport such as TCP which cannot use IP multicasting sending 1 packet to reach all members. Response filter: A RspFilter allows for filtering of responses and user-defined termination of a call.

For example, if we expect responses from 10 members, but can return after having received 3 non-null responses, a RspFilter could be used.

See Response filters for a discussion on response filters. Scope: a short, defining a scope. This allows for concurrent delivery of messages from the same sender. See Scopes: concurrent message delivery for messages from the same sender for a discussion on scopes. Flags: the various flags to be passed to the message, see the section on message flags for details.

Exclusion list: here we can pass a list of members addresses that should be excluded. If dests is null, the message will be sent to all members of the current cluster. Note that a possible destination set in the message will be overridden. If a message is sent synchronously defined by options. A Rsp instance contains the response value or null , an exception if the target handle method threw an exception, whether the target member was suspected, or not, and so on. See the example below for more details.

The future can be used to fetch the response list now or later , and it also allows for installation of a callback which will be invoked whenever the future is done.

See Asynchronous calls with futures for details on how to use NotifyingFutures. The destination of the message has to be non-null valid address of a member. The mode argument is ignored it is by default set to ResponseMode. One advantage of using this building block is that failed members are removed from the set of expected responses. For example, when sending a message to 10 members and waiting for all responses, and 2 members crash before being able to send a response, the call will return with 8 valid responses and 2 marked as failed.

The return value of castMessage is a RspList which contains all responses not all methods shown :. Note that this is only true as long as no response has yet been received, and the member has not been marked as failed. When a non-null list of addresses is passed as the destination list to MessageDispatcher.

If we want to restrict the reception of a message to the destination members, there are a few ways to do this:. Use anycasting. This means that the transport will send 2 unicasts. Use exclusion lists. The example starts with the creation of a channel. Next, an instance of MessageDispatcher is created on top of the channel. Then the channel is connected. The MessageDispatcher will from now on send requests, receive matching responses client role and receive requests and send responses server role.

We then send 10 messages to the group and wait for all responses. The timeout argument is 0, which causes the call to block until all responses have been received.

The handle method simply prints out a message and returns a string. This will be sent back to the caller as a response value in Rsp. Has the call thrown an exception, Rsp. RpcDispatcher is derived from MessageDispatcher.

It allows a programmer to invoke remote methods in all or single cluster members and optionally wait for the return value s. An application will typically create a channel first, and then create an RpcDispatcher on top of it. RpcDispatcher can be used to invoke remote methods client role and at the same time be called by other members server role.

Compared to MessageDispatcher, no handle method needs to be implemented. Instead the methods to be called can be placed directly in the class using regular method definitions see example below. The methods will get invoked using reflection. The family of callRemoteMethods methods is invoked with a list of receiver addresses. If null, the method will be invoked in all cluster members including the sender. Each call takes the target members to invoke it on null mean invoke on all cluster members , a method and a RequestOption.

The method can be given as 1 the method name, 2 the arguments and 3 the argument types, or a MethodCall containing a java. Method and argument can be given instead. The family of callRemoteMethod methods takes almost the same parameters, except that there is only one destination address instead of a list. If the dest argument is null, the call will fail.

The callRemoteMethod calls return the actual result or type T , or throw an exception if the method threw an exception on the target member. There is a runtime exception if a method cannot be resolved. Note that we could also use method IDs and the MethodLookup interface to resolve methods, which is faster and has every RPC carry less data across the wire. To see how this is done, have a look at some of the MethodLookup implementations, e. Class RpcDispatcher defines method print which will be called subsequently.

The entry point start creates a channel and an RpcDispatcher which is layered on top. Method callRemoteMethods then invokes the remote print in all cluster members also in the caller.

When all responses have been received, the call returns and the responses are printed. As can be seen, the RpcDispatcher building block reduces the amount of code that needs to be written to implement RPC-based group communication applications by providing a higher abstraction level between the application and the primitive channels. When invoking a synchronous call, the calling thread is blocked until the response or responses has been received. A Future allows a caller to return immediately and grab the result s later.

A NotifyingFuture extends java. Future , with its regular methods such as isDone , get and cancel. This is shown in the following code:. Response filters allow application code to hook into the reception of responses from cluster members and can let the request-response execution and correlation code know 1 wether a response is acceptable and 2 whether more responses are needed, or whether the call if blocking can return.

The RspFilter interface looks as follows:. The filter accepts all responses whose value is greater than 1, and returns as soon as it has received 2 responses which satisfy the above condition.

By default, a message received by a MessageDispatcher or RpcDispatcher is dispatched into application code by calling method handle from RequestHandler:. The return value of handle is then sent back to the sender of the message. The invocation is synchronous , ie.

The thread is therefore unusable for the duration of the method invocation. If the invocation takes a while, e. This can quickly lead to the thread pool being exhausted or many messages getting queued if the pool has an associated queue. Therefore a new way of dispatching messages to the application was devised; the asynchronous invocation API:. Extending RequestHandler , interface AsyncRequestHandler adds an additional method taking a request message and a Response object.

The request message contains the same information as before e. The Response argument is used to send a reply if needed at a later time, when processing is done. Response encapsulates information about the request e. The default implementation still uses the synchronous invocation style:. Method handle is called, which synchronously calls into application code and returns a result, which is subsequently sent back to the sender of the request message.

However, an application could subclass MessageDispatcher or RpcDispatcher as done in Infinispan , or it could set a custom request handler via MessageDispatcher. The thread which guided the request message from the network up to this point would be therefore immediately released and could be used for other messages.

To set the mode which is used, method MessageDispatcher. This can be changed even at runtime, to switch between sync and async invocation style. Asynchrounous invocation is typically used in conjunction with an application thread pool.

For example, all OOB calls could be dispatched directly to the thread pool, as ordering of OOB requests is not important, but regular requests should be added to a queue where they are processed sequentually.

The main benefit here is that request dispatching and ordering is now under application control if the application wants to do that. If not, we can still use synchronous invocation. A good example where asynchronous invocation makes sense are replicated web sessions.

If a cluster node A has web sessions, then replication of updates across the cluster generates messages from A. Because JGroups delivers messages from the same sender sequentially , even updates to unrelated web sessions are delivered in strict order.

With asynchronous invocation, the application could devise a dispatching strategy which assigns updates to different unrelated web sessions to any available thread from the pool, but queues updates to the same session, and processes those by the same thread, to provide ordering of updates to the same session. This class was written as a demo of how state can be shared between nodes of a cluster.

It has never been heavily tested and is therefore not meant to be used in production. A ReplicatedHashMap uses a concurrent hashmap internally and allows to create several instances of hashmaps in different processes. All of these instances have exactly the same state at all times. When creating such an instance, a cluster name determines which cluster of replicated hashmaps will be joined.

The new instance will then query the state from existing members and update itself before starting to service requests. If there are no existing members, it will simply start with an empty state. Modifications such as put , clear or remove will be propagated in orderly fashion to all replicas. Read-only requests such as get will only be invoked on the local hashmap. Since both keys and values of a hashtable will be sent across the network, they have to be serializable.

Putting a non-serializable value in the map will result in an exception at marshalling time. A ReplicatedHashMap allows to register for notifications, e. All listeners will get notified when such an event occurs. Notification is always local; for example in the case of removing an element, first the element is removed in all replicas, which then notify their listener s of the removal after the fact.

ReplicatedHashMap allow members in a group to share common state across process and machine boundaries. A put K,V,R method has a replication count R which determines on how many cluster members key K and value V should be stored. If one of those members goes down, or leaves the cluster, then a different member will be told to store K and V. ReplCache tries to always have R cluster members store K and V. A replication count of -1 means that a given key and value should be stored on all cluster members.

The mapping between a key K and the cluster member s on which K will be stored is always deterministic, and is computed using a consistent hash function. Note that this class was written as a demo of how state can be shared between nodes of a cluster. The new service is implemented as a protocol and is used via org.

LockService talks to the locking protocol via events. The main abstraction of a distributed lock is an implementation of java. In the example, we create a channel, then a LockService , then connect the channel. Then we grab a lock named "mylock" , which we lock and subsequently unlock. Note that the owner of a lock is always a given thread in a cluster, so the owner is the JGroups address and the thread ID.

This means that different threads inside the same JVM trying to access the same named lock will compete for it. If thread grabs the lock first, then thread-5 will block until thread releases the lock. JGroups includes a demo org. LockServiceDemo , which can be used to interactively experiment with distributed locks.

LockServiceDemo -h dumps all command line options. Note that the locking protocol has to be placed at or towards the top of the stack close to the channel , because it requires reliable unicasts and multicasts e.

Assume that B and D now acquire a lock "mylock". Now, only A is the coordinator, but C ceases to be a coordinator. Problem: D still holds a lock which should actually be invalid! Therefore the recommended solution here is for nodes to listen to MergeView changes if they expect merging to occur, and re-acquire all of their locks after a merge, e.

ExecutionService extends java. ExecutorService and distributes tasks submitted to it across the cluster, trying to distribute the tasks to the cluster members as evenly as possible. When a cluster member leaves or dies, the tasks is was processing are re-distributed to other members in the cluster.

ExecutionService talks to the executing protocol via events. The main abstraction is an implementation of java. All methods are supported. The restrictions are however that the callable or runnable must be Serializable, Externalizable or Streamable. Also the result produced from the future needs to be Serializable, Externalizable or Streamable. If a result is not, then a NotSerializableException with the name of the class will be returned to the Future as an exception cause.

In the example, we create a channel, then an ExecutionService, then connect the channel. Then we submit our callable giving us a Future. Then we wait for the future to finish returning our value and do something with it. If any exception occurs we print the stack trace of that exception. The ExecutionService is used as the Producer for this Pattern. There is a separate class that can was written specifically as a consumer, which can be ran on any node of the cluster.

This class is ExecutionRunner and implements java. A user is required to run one or more instances of a ExecutionRunner on a node of the cluster. By having a thread run one of these runners, that thread has now volunteered to be able to run any task that is submitted to the cluster via an ExecutionService.

This allows for any node in the cluster to participate or not participate in the running of these tasks and also any node can optionally run more than 1 ExecutionRunner if this node has additional capacity to do so. A runner will run indefinitely until the thread that is currently running it is interrupted.

If a task is running when the runner is interrupted the task will be interrupted. Below is an example of how simple it is to have a single node start and allow for 10 distributed tasks to be executed simultaneously on it:. In the example, we create a channel, then connect the channel, then an ExecutionRunner.

Then we create a java. ExecutorService that is used to start 10 threads that each thread runs the ExecutionRunner. This allows for this node to have 10 threads actively accept and work on requests submitted via any ExecutionService in the cluster. Since an ExecutionService does not allow for non serializable class instances to be sent across as tasks there are two utility classes provided to get around this problem.

For users that are used to using a CompletionService with an Executor there is an equivalent ExecutionCompletionService provided that allows for a user to have the same functionality.

It would have been preferred to allow for the same ExecutorCompletionService to be used, but due to its implementation using a non serializable object the ExecutionCompletionService was implemented to be used instead in conjunction with an ExecutionService. Also a utility class was designed to help users to submit tasks which use a non serializable class.

All the arguments provided must still be serializable and the return object as detailed previously. ExecutionServiceDemo , which can be used to interactively experiment with a distributed sort algorithm and performance. This is for demonstration purposes and performance should not be assumed to be better than local.

ExecutionServiceDemo -h dumps all command line options. The executing protocol has to be placed at or towards the top of the stack close to the channel. Cluster wide counters provide named counters similar to AtomicLong which can be changed atomically. Two nodes incrementing the same counter with initial value 10 will see 11 and 12 as results, respectively.

In stop , we shut down the thread pool and close the channel, causing this node to leave the cluster gracefully. Everybody else connected to this cluster will get a view change viewAccepted callback notifying them of the termination of this node. It is a wrapper for the task, the address of the submitter and the promise similar to a Future used to block on until the result has been received. The address of the submitter of the task is needed to send the result back. This is necessary when a node other than the originally assigned one takes over and processes a task.

Another inner class of Server is Request, which is used to send requests and responses between submitters masters and slaves:. A Request also implements Streamable implementation not shown which allows for more efficient marshalling.

We're sending 3 types of requests around: 1. It contains the task and a ClusterID, generated by the submitter and used by the slave to determine whether or not to accept the task. Note that only one slave in the entire cluster will handle a given task. This is unicast from the slave to the master which submitted the task. Everyone removes the task from their cache upon reception of this message. Now that we have all the bits and pieces in place, it is actually time to look at the submit method:.

This is an implementation of the Master interface. Remember that JGroups can only ship byte[] buffers over the wire. Finally we block on the promise of Entry until a result has been received or we get an exception e. The Slave part for handling of received tasks is simple:.

We simply take the task and call execute on it. In the Server. We implement 2 methods: receive Message and viewAccepted View. The receive method is shown below:. The receive method handles all requests and responses. Upon reception of a message, we need to grab its byte[] buffer, unmarshal it into a Request and then handle the request. We use the JGroups helper method Util. On reception of RESULT sent by a slave , we set the result in the promise, releasing the blocked submitter of the task.

The handleExecute method checks if a node should accept the task and, if yes, passes it to a thread pool to execute:. First, we add the task to our tasks cache, keyed by the ClusterID5. This is the rank of the node which should execute the task. If it matches our own rank, we create a Handler and pass it to the thread pool to be executed on a separate thread, otherwise we return from handleExecute. The Handler class is shown below:.

It executes the task against the Slave interface handle method and stores the result. If there is an exception, then the exception which is serializable by default is stored as result instead. Then a Response object is created from the result. Our code is now almost complete. The only thing missing is the handling of membership changes. Remember that we need to resubmit tasks from failed nodes, or from nodes who left gracefully, to other nodes.

This is done in viewAccepted View :. First, we determine which members left between the new and previous views. This is done with Util. Then we set the local address Channel. Next, the rank is computed by iterating through the new membership and comparing each element against the local address. On a match, our rank is the iteration counter.

Finally, we need to determine whether any nodes left since the previous view, and whether there are any tasks to take over from them. This is done by iterating through all left members if there are any and calling handleLeftMember , which is shown below:. This method iterates through all cache entries and compares the ID modulo cluster size to our own rank.

If it matches, we execute the task unless the submitter itself left, in which case we drop the task. Both assignments need to happen before handleLeftMember is called, as this method uses the 2 variables. If C crashes, D's and E's ranks change: D's rank is now 2. This means that D will process all of the tasks that C was processing and which hadn't completed by the time C crashed otherwise C would have removed them. This means that B, C and D will now execute tasks which were already being worked on by other nodes.

For example, C will re-execute D's tasks and B will re-execute C's tasks. This is not incorrect, as the submitter of a task will remove the task when completed. So, when receiving a result R from a slave for a task which was already completed and therefore removed, the submitter just drops R.

This is not wrong, but leads to spurious and unneeded processing. A better way to define the rank would be to use consistent hashing [2] , which minimizes changes to the rank and therefore reexecution of tasks already being worked on by other nodes. Our code is now complete. The last thing to do is to write the driver code, which we also add to Server:.

The main method creates a Server and starts it. The loop method waits for a key press and then submits a short running on '1' or long running on '2' task. The task simply returns a new Date with the current time. The long running task sleep for 15 seconds before returning the date. When 'q' is pressed, we stop the server gracefully and return. Let's see whether this thing works! The demo JAR can be downloaded here.

Let's start some instances and submit some tasks. To start an instance, we run:. Replace the IP address set with -Djgroups. We can see that we're the first node in the cluster, our local address is When we submit a task, we see that it is executed by our self, since we're the only node in the cluster:.

We can see that the view now has 2 members: Note that for this demo, we start all instances as separate processes on the same host, but of course we would place those processes on different hosts in real life. If we now go back to the first instance and submit 2 tasks, we can see that they are assigned to both instances:. Task 2 was executed by our self, but task 3 was executed by the second instance this can be verified by looking at the output of the second instance.

The available attributes in the FD element are listed below. A shunned node would need to rejoin the cluster via the discovery process. JGroups allows applications to configure a channel such that, when a channel is shunned, the process of rejoining the cluster and transferring state takes place automatically.

Regular traffic from a node is proof of life, so heartbeat messages are only sent when no regular traffic is detected on the node for a long period of time. Each member in a group connects to its neighbor, with the final member connecting to the first, forming a ring. When nodes intend to leave the group, they inform their neighbors so that they do not become suspected.

By default, JGroups uses the value of the system property jgroups. This system property can be set with the -b command line switch. For more information about binding JGroups sockets, see Section This protocol verifies whether a suspected member is really dead by pinging that member once again.

This verification is performed by the coordinator of the cluster. The suspected member is dropped from the cluster group if confirmed to be dead. The aim of this protocol is to minimize false suspicions. Here's an example. Let us look at the differences between these failure detection protocols to understand how they complement each other:. An overloaded machine might be slow in sending are-you-alive responses. Low timeouts lead to higher probability of false suspicions and higher network traffic.

Suspended in a debugger is no problem because the TCP connection is still open. Members will only be suspected when TCP connection breaks, so hung members will not be detected. A failure detection layer is intended to report real failures promptly, while avoiding false suspicions.

There are two solutions:. If a host crashed or an intermediate switch or router crashed without closing the TCP connection properly, we would detect this after 2 hours plus a few minutes.

This can only be done for the entire kernel in most operating systems, so if this is lowered to 15 minutes, this will affect all TCP sockets. However, in the case of a crashed switch or host, FD will make sure the socket is eventually closed and the suspect message generated. In this example, a member becomes suspected when the neighboring socket has been closed abnormally, in a process crash, for instance, since the operating system closes all sockets.

However, if a host or switch crashes, the sockets will not be closed. FD will suspect the neighbor after sixty seconds milliseconds. Note that if this example system were stopped in a breakpoint in the debugger, the node being debugged will be suspected once the timeout has elapsed. Reliable Delivery Protocols. Reliable delivery protocols within the JGroups stack ensure that messages are actually delivered, and delivered in the correct order First In, First Out, or FIFO to the destination node.

In ACK mode, the sender resends the message until acknowledgment is received from the receiver. In NAK mode, the receiver requests re-transmission when it discovers a gap. It uses positive acknowledgements ACK. It is configured as a sub-element under the JGroups config element. For instance, if the timeout is ,,, , the sender resends the message if it has not received an ACK after milliseconds the first time, and the second time it waits for milliseconds before re-sending, and so on.

A low value for the first timeout allows for prompt re-transmission of dropped messages, but means that messages may be transmitted more than once if they have not actually been lost that is, the message has been sent, but the ACK has not been received before the timeout.

High values ,, can improve performance if the network is tuned such that UDP datagram loss is infrequent. High values on networks with frequent losses will be harmful to performance, since later messages will not be delivered until lost messages have been re-transmitted.

It uses negative acknowledgements NAK. Under this protocol, each message is tagged with a sequence number. The receiver keeps track of the received sequence numbers and delivers the messages in order. When a gap in the series of received sequence numbers is detected, the receiver schedules a task to periodically ask the sender to re-transmit the missing message. The task is canceled if the missing message is received. Here is an example configuration:. The configurable attributes in the pbcast.

This is useful when the sender 's network layer tends to drop packets, avoiding the need to individually re-transmit to each node. By default, nodes save delivered messages so any node can re-transmit a lost message in case the original sender has crashed or left the group.

However, if we only ask the sender to resend its messages, we can enable this option and discard delivered messages. Group Membership GMS. It handles the requests to join and leave the cluster.

All nodes in the cluster, as well as any interested services like JBoss Cache or HAPartition, are notified if the group membership changes. The group membership service is configured in the pbcast. GMS sub-element under the JGroups config element. Here is an example configuration. GMS element are as follows. Retry afterwards. This flag does not prevent a node becoming the coordinator after the initial channel connection, if the current coordinator leaves the group.

This is more efficient than handling each request separately. Flow Control FC. The flow control FC protocol tries to adapt the data sending rate to the data receipt rate among nodes.

If a sender node is too fast, it might overwhelm the receiver node and result in out-of-memory conditions or dropped packets that have to be re-transmitted. In JGroups, flow control is implemented via a credit-based system. The sender and receiver nodes have the same number of credits bytes to start with. The sender subtracts credits by the number of bytes in messages it sends.

The receiver accumulates credits for the bytes in the messages it receives. When the sender's credit drops to a threshold, the receivers send some credit to the sender. If the sender's credit is used up, the sender blocks until it receives credits from the receiver. The flow control protocol is configured in the FC sub-element under the JGroups config element. The configurable attributes in the FC element are as follows. This value should be smaller than the JVM heap size.



0コメント

  • 1000 / 1000