Hystrix - Multiple Service Instance Endpoints

The following code example implements Hystrix Command for multiple service endpoints.

Assume there are two service endpoints (different hosts/port) providing same service. In this case we would have two hystrix command to call the service at both endpoints. The following code chooses the endpoint (hystrix command) on round robin fashion . Each command will have its own circuit breaker. The properties are configured using archaius.

If a service endpoint is down then the command will be marked as disabled and it will be enabled after the sleeping window period. In the mean time the other service endpoint will be used. The logic/algorithm to choose between service endpoint is provided in the HystrixCommandManager.

package com.test.hystrix;

import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.InputStreamReader;
import java.io.OutputStreamWriter;
import java.net.Socket;

import com.netflix.hystrix.HystrixCommandGroupKey;
import com.netflix.hystrix.HystrixCommandKey;
import com.netflix.hystrix.HystrixCommandProperties;

public class HystrixTestCommand extends SuperHystrixCommand<String> {

        private final String name;

        private final String commandKeyName;

        private final Integer port;

        public HystrixTestCommand(String name, String commandKey, Integer port) {
                super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("ExampleGroup"))
                               
                                .andCommandPropertiesDefaults(
                                                HystrixCommandProperties.Setter().withExecutionTimeoutInMilliseconds(3000))
                                .andCommandKey(HystrixCommandKey.Factory.asKey(commandKey)));

               
                this.name = name;
                this.commandKeyName = commandKey;
                this.port = port;
        }

        @Override
        protected String run() throws Exception {
                // a real example would do work like a network call here

                String inMsg = null;
                try {
                        Socket socket = new Socket("localhost", port);
                        socket.setSoTimeout(5000);
                        System.out.println("Started client  socket at " + socket.getLocalSocketAddress());
                        BufferedReader socketReader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
                        BufferedWriter socketWriter = new BufferedWriter(new OutputStreamWriter(socket.getOutputStream()));
                        BufferedReader consoleReader = new BufferedReader(new InputStreamReader(System.in));

                        String promptMsg = "Please enter a  message  (Bye  to quit):";
                        String outMsg = name;
                       
                        socketWriter.write(outMsg);
                        socketWriter.write("\n");
                        socketWriter.flush();
                       
                        String s = null;
                        do {                           
                                s = socketReader.readLine();
                               
                                if (s != null)
                                        inMsg = s;
                               
                        } while (s == null);
                        // }
                        socket.close();
                } catch (Exception e) {
                        System.out.println("Exception occured");
                       
                        throw new Exception(e);
                }

                return inMsg;
        }

        @Override
        protected String getFallback() {
                this.isCircuitBreakerOpen();
                return "Good Bye";
        }

        public boolean canAllowRequest() {
                return this.circuitBreaker.allowRequest();
        }

        public String getCommandKeyName() {
                return commandKeyName;
        }

        public Integer getPort() {
                return port;
        }
}



SocketExample .java

package com.test.hystrix;

import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.InputStreamReader;
import java.io.OutputStreamWriter;
import java.net.InetAddress;
import java.net.ServerSocket;
import java.net.Socket;

public class SocketExample {

        public static void main(String[] args) throws Exception {

                ServerSocket serverSocket = new ServerSocket(19065, 100, InetAddress.getByName("localhost"));
                System.out.println("Server started  at:  " + serverSocket);

                while (true) {
                        System.out.println("Waiting for a  connection...");

                        final Socket activeSocket = serverSocket.accept();

                        System.out.println("Received a  connection from  " + activeSocket);
                        Runnable runnable = () -> handleClientRequest(activeSocket);
                        new Thread(runnable).start(); // start a new thread
                }
        }

        public static void handleClientRequest(Socket socket) {
                try {

                        BufferedReader socketReader = null;
                        BufferedWriter socketWriter = null;
                        socketReader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
                        socketWriter = new BufferedWriter(new OutputStreamWriter(socket.getOutputStream()));

                        String inMsg = null;
                        while ((inMsg = socketReader.readLine()) != null) {
                                System.out.println("Received from  client: " + inMsg);

                                String outMsg = "Hello " + inMsg + "!";
                                socketWriter.write(outMsg);
                                socketWriter.write("\n");
                                // Thread.currentThread().sleep(3000);

                                for (int i = 0; i < 100; i++) {
                                }

                                socketWriter.flush();
                        }
                        socket.close();
                } catch (Exception e) {
                        e.printStackTrace();
                }
        }
}

SuperHystrixCommand.java

package com.test.hystrix;

import com.netflix.hystrix.HystrixCommand;
import com.netflix.hystrix.HystrixCommandGroupKey;
import com.netflix.hystrix.HystrixThreadPoolKey;

public abstract class SuperHystrixCommand<T> extends HystrixCommand<T> {

        private boolean enabled;

        public SuperHystrixCommand(HystrixCommandGroupKey group, HystrixThreadPoolKey threadPool,
                        int executionIsolationThreadTimeoutInMilliseconds) {
                super(group, threadPool, executionIsolationThreadTimeoutInMilliseconds);               
        }

        public SuperHystrixCommand(HystrixCommandGroupKey group, HystrixThreadPoolKey threadPool) {
                super(group, threadPool);              
        }

        public SuperHystrixCommand(HystrixCommandGroupKey group, int executionIsolationThreadTimeoutInMilliseconds) {
                super(group, executionIsolationThreadTimeoutInMilliseconds);           
        }

        public SuperHystrixCommand(com.netflix.hystrix.HystrixCommand.Setter setter) {
                super(setter);         
        }

        @Override
        protected T run() throws Exception {           
                return null;
        }

        public abstract boolean canAllowRequest();     

        public String getCommandKeyName() {
                return "";
        }

        public Integer getPort() {
                return 0;
        }

        public boolean isEnabled() {
                return this.enabled;
        }

        public void setEnabled(boolean enabled) {
                this.enabled = enabled;
        }
}

HystrixCommandManager.java

package com.test.hystrix;

import java.util.ArrayList;
import java.util.List;
import java.util.Timer;
import java.util.TimerTask;

public class HystrixCommandManager {

        public static  List<SuperHystrixCommand<String>> commandSet = new ArrayList<>();
       
        private static HystrixCommandManager hcM = new HystrixCommandManager();
       
        static {
               
                TimerTask timerTask = new MyTimerTask();
               
                Timer timer = new Timer(true);
                timer.scheduleAtFixedRate(timerTask, 0, 10*1000);
        }      
       
        private final ThreadLocal<Integer> cIndex = new ThreadLocal<Integer>() {
        @Override
        protected Integer initialValue() {
            return new Integer(0);
        }
    };
       
        private HystrixCommandManager() {
                super();
                HystrixTestCommand cw = new HystrixTestCommand("World","primaryCommand",19065);
                HystrixTestCommand cwSecT = new HystrixTestCommand("Secondary World","secondaryCommand",19066);
               
                commandSet.add(cw);
                commandSet.add(cwSecT);
        }

 public static HystrixCommandManager getInstance()
 {
         return hcM;
 }

        public SuperHystrixCommand<String> getHystrixCommandForPrimaryChannel(){
               
                SuperHystrixCommand<String> primaryCommand = new HystrixTestCommand("World","primaryCommand",19065);

                HystrixTestCommand cwSecT = new HystrixTestCommand("Secondary World","secondaryCommand",19066);

                int listSize  = commandSet.size();
                int counter  = 0;
             while (listSize > 0 && counter < listSize) {                
                 
                 if (this.cIndex.get() >= listSize) {
                        this.cIndex.set(0);
                    }    
                   
                 SuperHystrixCommand<String> cmd = (SuperHystrixCommand<String>) this.commandSet.get(this.cIndex.get() % listSize);
                 
                 Integer indexIncrementor = this.cIndex.get();
                    this.cIndex.set(++indexIncrementor);
                 
                        if(cmd.isEnabled()  ){
                                System.out.println("Manager - > "+cmd.getCommandKeyName()+" circuit -> "+cmd.isCircuitBreakerOpen());
                               
                                        primaryCommand = new HystrixTestCommand("World", cmd.getCommandKeyName(),cmd.getPort());
                                return primaryCommand;
                        }
                        else if(cmd.isCircuitBreakerOpen()){
                                cmd.setEnabled(false);
                        }
                                counter++;                     
                }
               
                 return primaryCommand;
        }      
}

class MyTimerTask extends TimerTask {

    @Override
    public void run() {
       
        for(SuperHystrixCommand<String> cmd:HystrixCommandManager.commandSet) {                
                if(cmd.isEnabled())
                        continue;
                else
                        cmd.setEnabled(true);
        }            
    }

    private void completeTask() {
        try {
            //assuming it takes 20 secs to complete the task
            Thread.sleep(20000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
   
    public  void main1(String args[]){
        TimerTask timerTask = new MyTimerTask();
     
        Timer timer = new Timer(true);
        timer.scheduleAtFixedRate(timerTask, 0, 10*1000);
        System.out.println("TimerTask started");  
    }
}

HystrixTestExecutor.java

package com.test.hystrix;

import com.netflix.config.ConfigurationManager;
import com.netflix.hystrix.HystrixCommandKey;
import com.netflix.hystrix.HystrixCommandMetrics;
import com.netflix.hystrix.HystrixCommandMetrics.HealthCounts;
import com.netflix.hystrix.HystrixRequestLog;
import com.netflix.hystrix.strategy.concurrency.HystrixRequestContext;
import rx.Observable;
import rx.Observer;

public class HystrixTestExecutor {

        public static void main(String[] args) {

                HystrixRequestContext context = HystrixRequestContext.initializeContext();

                ConfigurationManager.getConfigInstance().setProperty("hystrix.threadpool.default.coreSize", 8);
                ConfigurationManager.getConfigInstance().setProperty(
                                "hystrix.command.default.execution.isolation.thread.timeoutInMilliseconds", 3000);
               
                ConfigurationManager.getConfigInstance()
                                .setProperty("hystrix.command.default.metrics.rollingPercentile.numBuckets", 600);
                ConfigurationManager.getConfigInstance()
                                .setProperty("hystrix.command.default.metrics.rollingStats.numBuckets", 2);
                ConfigurationManager.getConfigInstance()
                                .setProperty("hystrix.command.default.metrics.rollingStats.timeInMilliseconds", 100000);

                ConfigurationManager.getConfigInstance()
                                .setProperty("hystrix.command.default.metrics.healthSnapshot.intervalInMilliseconds", 5000);
                ConfigurationManager.getConfigInstance()
                                .setProperty("hystrix.command.default.execution.timeout.enabled", true);
                ConfigurationManager.getConfigInstance()
                                .setProperty("hystrix.command.default.circuitBreaker.errorThresholdPercentage", 50);
                ConfigurationManager.getConfigInstance()
                                .setProperty("hystrix.command.default.circuitBreaker.requestVolumeThreshold", 2);
                ConfigurationManager.getConfigInstance()
                                .setProperty("hystrix.command.default.circuitBreaker.sleepWindowInMilliseconds", 10000);

                 startMetricsMonitor(true);

                for (int i = 0; i < 50; i++) {
                        HystrixTestCommand cw = (HystrixTestCommand) HystrixCommandManager.getInstance().getHystrixCommandForPrimaryChannel(); //new HystrixTestCommand("World");
                       
                        //HystrixTestCommand cw = new HystrixTestCommand("World", "hystrixTestCommand", 19065);
                       
                        Observable<String> fWorld = cw.observe();

                        fWorld.subscribe(new Observer<String>() {

                                @Override
                                public void onCompleted() {                                    
                                        // System.out.println("Completed");
                                }

                                @Override
                                public void onError(Throwable e) {
                                        System.out.println("Error - > "+e.getMessage());
                                }

                                @Override
                                public void onNext(String v) {
                                        System.out.println("Response: " + v);
                                }
                        });
                       
                        try {
                                Thread.sleep(1000);
                        } catch (Exception e) {                        
                        }
                       
                        System.out.println("Circuit Status -> " + (cw.isCircuitBreakerOpen()?"Opened":"closed"));
            System.out.print("---"+cw.getCommandKeyName() +"--\n");
                        fWorld.toBlocking().single();

                        System.out.println("Request => " + HystrixRequestLog.getCurrentRequest().getExecutedCommandsAsString());                       
                }

                HystrixCommandMetrics metrics = HystrixCommandMetrics
                                .getInstance(HystrixCommandKey.Factory.asKey(HystrixTestCommand.class.getSimpleName()));

                System.out.println("stats -> " + getStatsStringFromMetrics(metrics));
        }

        public static void startMetricsMonitor(final boolean shouldLog) {
                Thread t = new Thread(new Runnable() {

                        @Override
                        public void run() {
                                while (true) {
                                       
                                        // wait 5 seconds on each loop
                                        try {
                                                Thread.sleep(500);
                                        } catch (Exception e) {
                                                // ignore
                                        }

                                        // we are using default names so can use
                                        // class.getSimpleName() to derive the keys
                                        HystrixCommandMetrics metrics = HystrixCommandMetrics
                                                        .getInstance(HystrixCommandKey.Factory.asKey(HystrixTestCommand.class.getSimpleName()));
                                        System.out.println("stats -> " + getStatsStringFromMetrics(metrics));
                                }
                        }
                });

                t.setDaemon(true);
                t.start();
        }

        public static String getStatsStringFromMetrics(HystrixCommandMetrics metrics) {
                StringBuilder m = new StringBuilder();
                if (metrics != null) {
                        HealthCounts health = metrics.getHealthCounts();
                        m.append("Requests: ").append(health.getTotalRequests()).append(" ");
                        m.append("Errors: ").append(health.getErrorCount()).append(" (").append(health.getErrorPercentage())
                                        .append("%)   ");
                        m.append("Mean: ").append(metrics.getExecutionTimePercentile(50)).append(" ");
                        m.append("75th: ").append(metrics.getExecutionTimePercentile(75)).append(" ");
                        m.append("90th: ").append(metrics.getExecutionTimePercentile(90)).append(" ");
                        m.append("99th: ").append(metrics.getExecutionTimePercentile(99)).append(" ");
                }
                return m.toString();
        }
}

Technology: 

Search