Skip to content

Commit

Permalink
Merge pull request #29 from jihor/feature-max-retries
Browse files Browse the repository at this point in the history
Add retry capabilities
  • Loading branch information
aatarasoff authored Oct 2, 2016
2 parents 466f84d + ecb65a8 commit 9fee28c
Show file tree
Hide file tree
Showing 7 changed files with 149 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
public class ThriftClientPooledObjectFactory extends BaseKeyedPooledObjectFactory<ThriftClientKey, TServiceClient> {
public static final int DEFAULT_CONNECTION_TIMEOUT = 1000;
public static final int DEFAULT_READ_TIMEOUT = 30000;
public static final int DEFAULT_MAX_RETRIES = 1;
private TProtocolFactory protocolFactory;
private LoadBalancerClient loadBalancerClient;
private PropertyResolver propertyResolver;
Expand All @@ -38,6 +39,7 @@ public TServiceClient create(ThriftClientKey key) throws Exception {

int connectTimeout = propertyResolver.getProperty(serviceName + ".connectTimeout", Integer.class, DEFAULT_CONNECTION_TIMEOUT);
int readTimeout = propertyResolver.getProperty(serviceName + ".readTimeout", Integer.class, DEFAULT_READ_TIMEOUT);
int maxRetries = propertyResolver.getProperty(serviceName + ".maxRetries", Integer.class, DEFAULT_MAX_RETRIES);

TProtocol protocol;

Expand All @@ -49,6 +51,7 @@ public TServiceClient create(ThriftClientKey key) throws Exception {
);
loadBalancerClient.setConnectTimeout(connectTimeout);
loadBalancerClient.setReadTimeout(readTimeout);
loadBalancerClient.setMaxRetries(maxRetries);

protocol = protocolFactory.getProtocol(loadBalancerClient);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ public class TLoadBalancerClient extends TTransport {
private InputStream inputStream_ = null;
private int connectTimeout_ = 0;
private int readTimeout_ = 0;
private int maxRetries_ = 1;
private Map<String, String> customHeaders_ = null;

public TLoadBalancerClient(LoadBalancerClient loadBalancerClient, String serviceName, String path) throws TTransportException {
Expand All @@ -41,6 +42,13 @@ public void setReadTimeout(int timeout) {
this.readTimeout_ = timeout;
}

public void setMaxRetries(int maxRetries) {
if (maxRetries <= 0) {
throw new RuntimeException("Illegal maxRetries value [" + maxRetries + "]. Positive value expected");
}
this.maxRetries_ = maxRetries;
}

public void setCustomHeaders(Map<String, String> headers) {
this.customHeaders_ = headers;
}
Expand Down Expand Up @@ -97,50 +105,64 @@ public void write(byte[] buf, int off, int len) {
public void flush() throws TTransportException {
byte[] data = this.requestBuffer_.toByteArray();
this.requestBuffer_.reset();
int retryCount = 0;
while (true) {
try {
retryCount++;
doFlush(data);
return;
} catch (IOException ioe) {
if (retryCount >= maxRetries_) {
throw new TTransportException(ioe);
}
} catch (Exception e) {
if (retryCount >= maxRetries_) {
throw e;
}
}
}
}

try {
ServiceInstance serviceInstance = this.loadBalancerClient.choose(serviceName);
private void doFlush(byte[] data) throws TTransportException, IOException {
ServiceInstance serviceInstance = this.loadBalancerClient.choose(serviceName);

if (serviceInstance == null) {
throw new TTransportException(TTransportException.NOT_OPEN, "No one service instance is available");
}
if (serviceInstance == null) {
throw new TTransportException(TTransportException.NOT_OPEN, "No service instances available");
}

HttpURLConnection iox =
(HttpURLConnection) new URL(
serviceInstance.getUri().toString() + path
).openConnection();
if (this.connectTimeout_ > 0) {
iox.setConnectTimeout(this.connectTimeout_);
}
HttpURLConnection iox =
(HttpURLConnection) new URL(
serviceInstance.getUri().toString() + path
).openConnection();
if (this.connectTimeout_ > 0) {
iox.setConnectTimeout(this.connectTimeout_);
}

if (this.readTimeout_ > 0) {
iox.setReadTimeout(this.readTimeout_);
}
if (this.readTimeout_ > 0) {
iox.setReadTimeout(this.readTimeout_);
}

iox.setRequestMethod("POST");
iox.setRequestProperty("Content-Type", "application/x-thrift");
iox.setRequestProperty("Accept", "application/x-thrift");
iox.setRequestProperty("User-Agent", "Java/THttpClient");
if (this.customHeaders_ != null) {
Iterator responseCode = this.customHeaders_.entrySet().iterator();
iox.setRequestMethod("POST");
iox.setRequestProperty("Content-Type", "application/x-thrift");
iox.setRequestProperty("Accept", "application/x-thrift");
iox.setRequestProperty("User-Agent", "Java/THttpClient");
if (this.customHeaders_ != null) {
Iterator responseCode = this.customHeaders_.entrySet().iterator();

while (responseCode.hasNext()) {
Map.Entry header = (Map.Entry) responseCode.next();
iox.setRequestProperty((String) header.getKey(), (String) header.getValue());
}
while (responseCode.hasNext()) {
Map.Entry header = (Map.Entry) responseCode.next();
iox.setRequestProperty((String) header.getKey(), (String) header.getValue());
}
}

iox.setDoOutput(true);
iox.connect();
iox.getOutputStream().write(data);
int responseCode1 = iox.getResponseCode();
if (responseCode1 != 200) {
throw new TTransportException("HTTP Response code: " + responseCode1);
} else {
this.inputStream_ = iox.getInputStream();
}
} catch (IOException var5) {
throw new TTransportException(var5);
iox.setDoOutput(true);
iox.connect();
iox.getOutputStream().write(data);
int responseCode1 = iox.getResponseCode();
if (responseCode1 != 200) {
throw new TTransportException("HTTP Response code: " + responseCode1);
} else {
this.inputStream_ = iox.getInputStream();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,11 @@
import example.TName;
import info.developerblog.spring.thrift.annotation.ThriftClient;
import info.developerblog.spring.thrift.annotation.ThriftClientsMap;
import java.util.Map;
import org.apache.thrift.TException;
import org.springframework.stereotype.Service;

import java.util.Map;

/**
* Created by aleksandr on 08.09.15.
*/
Expand All @@ -20,6 +21,12 @@ public class GreetingService {
@ThriftClient(serviceId = "greeting-service-with-timeouts", path = "/api")
TGreetingService.Client clientWithTimeout;

@ThriftClient(serviceId = "greeting-service-with-timeouts-oneoff", path = "/api")
TGreetingService.Client oneoffClientWithTimeout;

@ThriftClient(serviceId = "greeting-service-with-timeouts-retriable", path = "/api")
TGreetingService.Client retriableClientWithTimeout;

@ThriftClientsMap(mapperClass = SampleMapper.class)
Map<String, TGreetingService.Client> clientsMap;

Expand All @@ -34,6 +41,14 @@ public String getGreetingWithTimeout(String lastName, String firstName) throws T
return clientWithTimeout.greet(new TName(firstName, lastName));
}

public String getOneOffGreetingWithTimeout(String lastName, String firstName) throws TException {
return oneoffClientWithTimeout.greet(new TName(firstName, lastName));
}

public String getRetriableGreetingWithTimeout(String lastName, String firstName) throws TException {
return retriableClientWithTimeout.greet(new TName(firstName, lastName));
}

public String getGreetingForKey(String key, String lastName, String firstName) throws TException {
return clientsMap.get(key).greet(new TName(firstName, lastName));
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
package info.developerblog.examples.thirft.simpleclient;

import org.apache.commons.pool2.KeyedObjectPool;
import info.developerblog.examples.thirft.simpleclient.configuration.CountingAspect;
import info.developerblog.examples.thirft.simpleclient.configuration.TestConfiguration;
import org.apache.commons.pool2.impl.GenericKeyedObjectPool;
import org.apache.thrift.transport.TTransportException;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
Expand All @@ -23,7 +25,7 @@
* Created by aleksandr on 01.09.15.
*/
@RunWith(SpringJUnit4ClassRunner.class)
@SpringApplicationConfiguration(classes = SimpleClientApplication.class)
@SpringApplicationConfiguration(classes = {SimpleClientApplication.class, TestConfiguration.class})
@WebAppConfiguration
@IntegrationTest("server.port:8080")
@DirtiesContext
Expand All @@ -38,6 +40,9 @@ public class TGreetingServiceHandlerTests {
@Autowired
GenericKeyedObjectPool clientPool;

@Autowired
CountingAspect countingAspect;

@Value("${thrift.client.max.threads}")
private int maxThreads;

Expand Down Expand Up @@ -75,6 +80,28 @@ public void testMisconfigurableClient() throws Exception {
greetingService.getGreetingWithMisconfguration("Doe", "John");
}

@Test
public void testClientWithDefaultRetries() throws Exception {
countingAspect.counter.set(0);
try {
greetingService.getOneOffGreetingWithTimeout("Doe", "John");
Assert.fail("TTransportException Expected");
} catch (TTransportException e){
Assert.assertEquals(1, countingAspect.counter.intValue());
}
}

@Test
public void testClientWithMultipleRetries() throws Exception {
countingAspect.counter.set(0);
try {
greetingService.getRetriableGreetingWithTimeout("Doe", "John");
Assert.fail("TTransportException Expected");
} catch (TTransportException e){
Assert.assertEquals(3, countingAspect.counter.intValue());
}
}

@Test
public void testClientThreadCount() {
assertEquals(clientPool.getMaxIdlePerKey(), maxThreads);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package info.developerblog.examples.thirft.simpleclient.configuration;

import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.Before;
import org.aspectj.lang.annotation.Pointcut;

import java.util.concurrent.atomic.AtomicInteger;

/**
* @author jihor ([email protected])
* Created on 2016-09-30
*/
@Aspect
public class CountingAspect {
public AtomicInteger counter = new AtomicInteger(0);

@Pointcut("execution(* org.springframework.cloud.client.loadbalancer.LoadBalancerClient.choose(..))")
private void loadBalancerServerChoice() {}

@Before("loadBalancerServerChoice()")
public void before(){
counter.incrementAndGet();
}
}
Original file line number Diff line number Diff line change
@@ -1,10 +1,18 @@
package info.developerblog.examples.thirft.simpleclient.configuration;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.EnableAspectJAutoProxy;

/**
* Created by aleksandr on 18.02.16.
*/
@Configuration
@EnableAspectJAutoProxy
public class TestConfiguration {
@Bean
public CountingAspect countingAspect(){
return new CountingAspect();
}

}
11 changes: 11 additions & 0 deletions examples/simple-client/src/test/resources/application.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,5 +6,16 @@ greeting-service-with-timeouts:
readTimeout: 1
ribbon:
listOfServers: localhost:8080
greeting-service-with-timeouts-oneoff:
connectTimeout: 1
readTimeout: 1
ribbon:
listOfServers: localhost:8080
greeting-service-with-timeouts-retriable:
connectTimeout: 1
readTimeout: 1
maxRetries: 3
ribbon:
listOfServers: localhost:8080

thrift.client.max.threads: 10

0 comments on commit 9fee28c

Please sign in to comment.