Skip to main content

· 3 min read
Tu Huynh

Spring Data Redis provides an easy way to integrate with Redis instances.

However, in some cases, it's more convenient to use an embedded server than to create an environment with a real server.

In this article, we will introduce how to use Keva as an embedded Redis server for Spring Boot test.

Install Keva as a dependency

Keva is a Java library, so we can use it as a dependency in our project.

build.gradle

dependencies {
implementation 'dev.keva:kevadb:1.0.0-rc2'
}

or:

pom.xml

<dependency>
<groupId>dev.keva</groupId>
<artifactId>kevadb</artifactId>
<version>1.0.0-rc2</version>
</dependency>

Setup

After adding the dependencies, we should define the connection settings between the Redis server and our application.

Let's begin by creating a class that will hold our properties:

@Configuration
public class RedisProperties {
private int redisPort;
private String redisHost;

public RedisProperties(
@Value("${spring.redis.port}") int redisPort,
@Value("${spring.redis.host}") String redisHost) {
this.redisPort = redisPort;
this.redisHost = redisHost;
}

// getters
}

Next, we should create a configuration class that defines the connection and uses our properties:

@Configuration
@EnableRedisRepositories
public class RedisConfiguration {

@Bean
public LettuceConnectionFactory redisConnectionFactory(
RedisProperties redisProperties) {
return new LettuceConnectionFactory(
redisProperties.getRedisHost(),
redisProperties.getRedisPort());
}

@Bean
public RedisTemplate<?, ?> redisTemplate(LettuceConnectionFactory connectionFactory) {
RedisTemplate<byte[], byte[]> template = new RedisTemplate<>();
template.setConnectionFactory(connectionFactory);
return template;
}
}

The configuration is quite simple. Additionally, it allows us to run the embedded server on a different port.

Check out our Introduction to Spring Data Redis article to learn more about the Redis with Spring Boot.

Keva as Embedded Redis Server

Now, we'll configure the embedded server and use it in one of our tests.

spring.redis.host=localhost
spring.redis.port=6370

After that, we'll create a @TestConfiguration-annotated class:

@TestConfiguration
public class TestRedisConfiguration {

private KevaServer redisServer;

public TestRedisConfiguration(RedisProperties redisProperties) {
KevaConfig kevaConfig = KevaConfig.builder()
.hostname(redisProperties.getRedisHost())
.port(redisProperties.getRedisPort())
.persistence(false)
.aof(false)
.workDirectory("./")
.build();
this.redisServer = KevaServer.of(kevaConfig);
}

@PostConstruct
public void postConstruct() {
redisServer.run();
}

@PreDestroy
public void preDestroy() {
redisServer.shutdown();
}
}

The server will start once the context is up. It'll start on our machine on the port that we've defined in our properties. For instance, we can now run the test without stopping the actual Redis server.

Additionally, the server will stop once the context is destroyed.

Finally, let's create a test that'll use our TestRedisConfiguration class:

@RunWith(SpringRunner.class)
@SpringBootTest(classes = TestRedisConfiguration.class)
public class UserRepositoryIntegrationTest {

@Autowired
private UserRepository userRepository;

@Test
public void shouldSaveUser_toRedis() {
UUID id = UUID.randomUUID();
User user = new User(id, "name");

User saved = userRepository.save(user);

assertNotNull(saved);
}
}

The user has been saved to our embedded Keva server.

Additionally, we had to manually add TestRedisConfiguration to SpringBootTest. As we said earlier, the server has started before the test and stopped after.

The code for examples is available over on GitHub.

Reference

· 5 min read
Tu Huynh

When developing Keva project, I was struggled at finding a suitable IoC/DI framework: choose between Spring, Guice, and others. While Spring is a popular choice, it is not a good choice for a project with a small number of components and need to start fast. On the other hand, Guice is also a popular choice, seems like it will start faster than Spring (because no need to scan class path for components), but I personally don't like its APIs with a lot of boilerplate (define explicit bindings, etc.).

Finally, I've decided to build a Java IoC/DI framework from scratch, with Spring's IoC API and just contains the bare minimum logics of a DI framework. That means to remove almost the "magic" part of Spring IoC, and just focus on the core logics: create and manage beans, and inject dependencies.

Why need a DI/IoC?

While some others can prefer writing code without DI/IoC: manually init instance/component and manually inject them, just like below:

var svc = new ShippingService(new ProductLocator(), 
new PricingService(), new InventoryService(),
new TrackingRepository(new ConfigProvider()),
new Logger(new EmailLogger(new ConfigProvider())));

Many don't realize that their dependencies chain can become nested, and it quickly becomes unwieldy to wire them up manually. Even with factories (factory pattern), the duplication of your code is just not worth it.

DI/IoC can help to init instance/component and inject them, and it's also automatically wire them up, so you don't have to write code manually. It also can be used to decouple the classes and improve testability, so we can get many of the benefits.

But is that (IoC framework) creates magic? Yes, if you can trust the fact that this code does its job, then you can safely skip all of that property wrapping mumbo-jumbo. You've got other problems to solve.

How Keva IoC works

Since Keva IoC is writing from scratch, I can control how magic the IoC framework will be, thus remove the unnecessary magic likes: bean lifecycle, property wrapping, etc.

For just the bare minimal logics of a DI framework, it contains:

  • Scan beans (scan the @Component annotated classes)
  • Get the beans definitions, then create the beans
  • Store beans in a "bean container"
  • Scan the @Autowire annotations, then automatically inject dependencies

Implement Keva IoC

Create annotation @Component first:

@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.TYPE)
public @interface Component {
}

Create annotation @Autowired:

@Target({ ElementType.METHOD, ElementType.CONSTRUCTOR, ElementType.FIELD })
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface Autowired {
}

Since @Autowired is injected by type, but dependency injection may also be injected by name, the annotation @Qualifier is created:

@Target({ ElementType.CONSTRUCTOR, ElementType.FIELD, ElementType.METHOD, ElementType.PARAMETER, ElementType.TYPE, ElementType.ANNOTATION_TYPE })
@Retention(RetentionPolicy.RUNTIME)
@Inherited
@Documented
public @interface Qualifier {
String value() default "";
}

How to scan beans? We need a package helps to scan all the class in th classpath, org.reflections is a good choice.

public static List<Class<?>> getClasses(String packageName) {
List<Class<?>> classes=new ArrayList<>();
String path = packageName.replace('.','/');
ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
URI pkg = Objects.requireNonNull(classLoader.getResource(path)).toURI();
Enumeration<URL> resources = classLoader.getResources(path);
List<File> dirs = new ArrayList<>();
while (resources.hasMoreElements()) {
URL resource = resources.nextElement();
dirs.add(new File(resource.getFile()));
}
for (File directory : dirs){
classes.addAll(findClasses(directory,packageName));
}
return classes;
}

We have a BeanContainer class to store and manage all the beans:

public class BeanContainer {
public final Map<Class<?>, Map<String, Object>> beans = new HashMap<>(10);
// ...

After scanned and created all the beans, next we have to scan all the @Autowire annotations, and inject the dependencies:

private void fieldInject(Class<?> clazz, Object classInstance) {
Set<Field> fields = FinderUtil.findFields(clazz, Autowired.class);
for (Field field : fields) {
String qualifier = field.isAnnotationPresent(Qualifier.class) ? field.getAnnotation(Qualifier.class).value() : null;
Object fieldInstance = _getBean(field.getType(), field.getName(), qualifier, true);
field.set(classInstance, fieldInstance);
}
}

That's basically the core logics of Keva IoC, for more details, please refer to Keva IoC source code.

KevaIoC usage

Let's say we have an interface Engine.java:

public interface Engine {
String getName();
}

And we have a class V8Engine.java that implements Engine:

@Component
public class V8Engine implements Engine {
public String getName() {
return "V8";
}
}

And SpiderMonkeyEngine.java also implements Engine:

@Component
public class SpiderMonkeyEngine implements Engine {
public String getName() {
return "SpiderMonkey";
}
}

And a Browser.java class that need to inject an Engine implementation:

@Component
public class Browser {
@Autowired
String version;

Engine engine;
BrowserRenderer renderer;

@Autowired
public Browser(@Qualifier("v8Engine") Engine engine, BrowserRenderer renderer) {
this.engine = engine;
this.renderer = renderer;
}

public String run() {
return renderer.render("This browser run on " + engine.getName());
}

public String getVersion() {
return renderer.render("Browser version: " + version);
}
}

And the Main.class be like:

public class Main {
public static void main(String[] args) {
KevaIoC context = KevaIoC.initBeans(Main.class);
Browser browser = context.getBean(Browser.class);
System.out.println(browser.run());
}
}

The APIs basically looks the same as Spring IoC, only the actual implementation is simpler and more concise, with less magic. Still the Keva codebase is clean and easy to understand based on elegant Spring IoC's API similar, and the startup time remains very fast due to its simplicity.

Summary

Some of the Keva IoC's main features are:

  • Spring-like annotation-support, no XML
  • Fast startup time, small memory footprint (see performance section soon)
  • Pocket-sized, only basic features (no bean's lifecycle, no "Spring's magic")
  • Less opinionated, support mount existing beans (means can integrate well with other IoC/DI frameworks)

Supported annotations:

  • @ComponentScan
  • @Component
  • @Configuration
  • @Bean
  • @Autowired (supports field injection, setter injection and constructor injection)
  • @Qualifier
  • Support mount existing beans via .initBeans(Main.class, beanOne, beanTwo...) static method

KevaIoC is very fit for small applications, that has to have small memory footprint, small jar size and fast startup time, for example plugins, (embedded) standalone application, integration tests, jobs, Android applications, etc.

Maybe in the future if more logic needed from Keva, I'll add more "magic" features, like bean's lifecycle, etc, but for now, it's enough.

· 2 min read
Tuan Nguyen

Some Proxy-based Clustering solution for Keva right now:

Twemproxy

Twemproxy (nutcracker) is a fast and lightweight proxy for memcached and redis protocol developed by Twitter. It was built primarily to reduce the number of connections to the caching servers on the backend. This, together with protocol pipelining and sharding enables you to horizontally scale your distributed caching architecture

Features

  • Pipelining: Twemproxy enables proxying multiple client connections onto one or few server connections. This architectural setup makes it ideal for pipelining requests and responses and hence saving on the round trip time.
  • Zero copy: All the memory for incoming requests and outgoing responses is allocated in mbuf. Mbuf enables zero-copy because the same buffer on which a request was received from the client is used for forwarding it to the server
  • Sharding: Data are shared automatically across multiple servers

Disadvantage

  • No automatically resharding when add/remove node

Dynomite

Dynomite, inspired by Dynamo whitepaper, is a thin, distributed dynamo layer for different storage engines and protocols. Currently these include Redis and Memcached. Dynomite supports multi-datacenter replication and is designed for high availability. Dynomite was originally a fork of Twemproxy.

Features

  • Everything Twemproxy offers
  • Replication: Dynomite offers replication between multiple cluster.
  • High availability: With replication feature, Dynomite can handle different failure scenarios

Disadvantage

  • No automatically resharding when add/remove node

Summary

Twemproxy is good as a pure proxy that serve read/write request to multiple server while Dynomite can also do that plus replication + HA but it add a bit more complexity since it is more than just a mere proxy. For now, Keva can go with Twemproxy because of it simplicity.

· 7 min read
Viet Nguyen (Blu)

When developing the server for a learning project database Keva I got the chance to learn a bit more about non-blocking I/O (NIO) and their libraries in Java. Netty implementations, I finally was able to implement a working prototype by myself. You can check out the source code here (it's only a few short files).

The basics

There are two basic parts for this problem, first the NIO part, and then the server part.

Fundamentally, NIO from the application level just means not waiting around. For example, when we call a "read" method on a socket, the results are returned immediately whether we can read it or not, the process continues to work on the next line of code instead of waiting for data. We can pass in a callback function to handle the results whenever it's ready.

The server's primary logic is to take in messages from clients, process them, and return the results to those clients, all via the network.

In a traditional blocking server. When we read bytes from a connection, the server will have to wait for the whole message to arrive before processing, since we can only read a limited amount of data in the buffer. To handle multiple clients, we spawn multiple threads.

For the NIO server, a thread doesn't need to stop and wait for the whole message, so we can read what we can, then continue to do other stuff, and come back to read again when there is new data. The main problem is how to we manage bytes being read asynchronously to construct correct messages. This is the problem I struggled with and finally managed to solve (but probably not in the optimal way though).

The idea

So my idea to this problem is using the event-driven architecture. Specifically, we can have 2 thread groups, the main thread group, which is responsible for accepting connection (this can just be 1 thread), and the worker thread group, which is responsible for reading, parsing, and writing the results to the socket. The worker group is very importantly since I'm using it for executing read writes but it's also used by Java's NIO2 library to invoke completion handlers.

For example purposes, this will be a TCP echo server, and messages will use the \n line ending character as delimiter between them.

So what happens when data arrives? Well it could be in any of these forms below:

  1. part\n : It could be a full message or the last part of a message.
  2. a partial mess : A partial message, we need a way to store it while waiting for the rest of the message to arrive.
  3. last part\n mess 2\n mess 3\n start new : We can expect to receive many messages or portion of them in a single socket read as well.

The flow

So the process will look like this:

Bootstraping the server

  • We start the server by initiating threads used as worker for the socket channels as well as our own processing.
private final ExecutorService worker = Executors.newFixedThreadPool(4);
private final ExecutorService main = Executors.newFixedThreadPool(1);

Then bind the socket to the port and start accepting connections. Also we need to make the server run forever, here I just used a System.in.read to achieve that.

group = AsynchronousChannelGroup.withThreadPool(worker);
server = AsynchronousServerSocketChannel.open(group);
final int port = 8989;
server.bind(new InetSocketAddress(port));
main.submit(() -> server.accept(null, new AcceptHandler(server, main, worker)));
System.out.println("Server started at: " + port);
System.in.read();

When client is connected

  • The thread that invokes the handler will submit a task for accepting connection again.
  • The accept completion handler will also initialize a byte buffer, a queue for storing completed message to write and submit a task for reading the socket.
  • The messBuf will be used to store the current unfinished message.
  • The writeQueue need to be thread-safe for use in both the thread reading as well as the thread trying to write.
public void completed(AsynchronousSocketChannel channel, Object attachment) {
main.submit(() -> server.accept(null, this));

final ByteBuffer buffer = ByteBuffer.allocate(bufferSize);
final StringBuffer messBuf = new StringBuffer();
final Queue<String> writeQueue = new ConcurrentLinkedQueue<>();
worker.submit(() -> channel.read(buffer, null,
new ReadHandler(worker, channel, buffer, messBuf, writeQueue)));
}

When a read finishes

  • We process the data based on the cases described above. We loop through the buffer, when the delimiter('\n') is reached, the previous characters are put into the current buffer and that buffer is considered a finished message so we put it in the writeQueue for the writer worker to consume later on. After processing is finished, we clear the buffer and submit a new read task. Here's the snippet:
int startIdx = 0;
int endIdx;
while (frame.indexOf(DELIM, startIdx) != -1) {
endIdx = frame.indexOf(DELIM, startIdx) + 1;
messBuf.append(frame, startIdx, endIdx);
writeQueue.add(messBuf.toString());
this.messBuf = new StringBuffer();
startIdx = endIdx;
}
messBuf.append(frame, startIdx, frame.length());
channel.read(buffer, null, this);
  • Everytime we finish a read, we can check to see if the writeQueue has any finished messages for the writer consume. If it does, submit a task to consume it.

When a write finishes

  • The response is not guaranteed to be written completely in one write, so if there is still something to write, we to continue write it.

If the current message is really finished, we still need to check the writeQueue again since on the read side, 1 read will trigger at most only 1 write task. However, the stream is continuous and 1 read might contain multiple messages. Therefore the check after finished writing is necessary. We could maybe count the number messages in the read handler then submit as much write task.

public void completed(Integer bytesWritten, Object attachment) {
if (bytesWritten > 0 && writeBuf.hasRemaining()) {// write not finished, continue writing this buffer
worker.submit(() -> channel.write(writeBuf, null, this));
} else {
// Continue to write from the queue
String message = writeQueue.peek();
if (message != null) {
writeQueue.remove();
ByteBuffer writeBuf = ByteBuffer.wrap(message.getBytes());
channel.write(writeBuf, null, new WriteHandler(worker, channel, writeBuf, writeQueue));
}
}
}

The result

Well the implementation worked (as least it for the test suite I wrote for it):

  • I tried sending messages smaller and bigger than the buffer size (which is 8 bytes by default):
@Test
void buf8_echo1Less8_success() throws Exception {
final SocketClient client = startClient();
final String abcde = client.exchange("abcde");
client.disconnect();

assertEquals("abcde", abcde);
}
  • Tested it with multiple messages:
final SocketClient client = startClient();
final List<String> abcd = client.exchange("12345678\n987654321\nabc\nd", 4);
client.disconnect();

assertEquals("12345678", abcd.get(0));
assertEquals("987654321", abcd.get(1));
assertEquals("abc", abcd.get(2));
assertEquals("d", abcd.get(3));
  • Tested with many clients:
final ExecutorService executor = Executors.newFixedThreadPool(3);
final int taskNum = 10000;
for (int i = 0; i < taskNum; i++) {
tasks.add(() -> {
final SocketClient client = startClient();
final String res = client.exchange(mess16);
client.disconnect();
return res;
});
}

Maybe the way I test is kinda wrong, if you notice a mistake, I'm open to feedbacks. This is just a way to implement it, and it's actually a very naive, slow one. I used string mainly in my code so I had to convert the buffer to string. A better approach would be to deal with the bytes directly. Also the way I implemented the writeQueue required bytes being copied from buffers to the string holders. Modern NIO servers are implemented with zero-copy techniques for dealing with the buffers, for example Netty have their own type of buffers that stores pointers to the original buffers used to read. That could be topic for more research however I'm quite satisfied with these results for now, hope this was useful to you.