ActiveFS Examples

To run the examples in an IDE, you need to clone ActiveJ project:

$ git clone https://github.com/activej/activej

And import it as a Maven project. Check out branch v2.2. Before running the example, build the project (Ctrl + F9 for IntelliJ IDEA).

Server Setup

Let’s have a closer look at Server Setup Example. To make setup and launching as simple as possible, there is a special ActiveFsServerLauncher, an ActiveJ Launcher implementation (abstracted implementation of main methods). It allows to simply set up applications, so all you need to set up an FS server is to override several Launcher methods:

  • onInit - runs prior to application start
  • getOverrideModule - overrides Launcher’s default internal module definitions
  • run - Launcher’s main method, represents business logic

Then launch the Launcher.

public class ServerSetupExample extends SimpleTcpServerLauncher {
	private Path storage;

	@Override
	protected void onInit(Injector injector) throws Exception {
		storage = Files.createTempDirectory("server_storage");
	}

	@Override
	protected Config createConfig() {
		return super.createConfig()
				.with("activefs.path", storage.toString())
				.with("activefs.listenAddresses", "6732");
	}

	@Override
	protected void run() throws Exception {
		awaitShutdown();
	}

	public static void main(String[] args) throws Exception {
		Launcher launcher = new ServerSetupExample();
		launcher.launch(args);
	}
}

See full source code on GitHub

File Upload

FileUploadExample also extends Launcher and thus implements the aforementioned Launcher methods.

In this example we will use a ActiveFs instance which depends on asynchronous ActiveJ Eventloop. To simplify working with dependencies we will use ActiveInject DI library. It is lightning-fast, efficient and perfectly compatible with Launcher. So we simply @Inject two instances and @Provides factory methods. Just like in the previous example, we will also overwrite Launcher methods onInit, getOverrideModule, and run.

Also, this example utilizes ActiveJ CSP component, particularly ChannelFileReader class. It allows to asynchronously read binary data from files.

You can see full example sources on GitHub, here we will consider only the upload process that is defined in the overwritten method run.

@Override
protected void run() throws Exception {
	ExecutorService executor = newSingleThreadExecutor();
	CompletableFuture<Void> future = eventloop.submit(() ->
			// consumer result here is a marker of it being successfully uploaded
			ChannelFileReader.open(executor, clientFile)
					.then(cfr -> cfr.streamTo(client.upload(FILE_NAME, EXAMPLE_TEXT.length())))
					.whenResult(() -> System.out.printf("%nFile '%s' successfully uploaded%n%n", FILE_NAME))
	);
	future.get();
	executor.shutdown();
}

See full source code on GitHub

File Download

FileDownloadExample has an implementation that is similar to the File Upload example. You can see full example sources on GitHub, here we will consider only the download process that is defined in the overwritten method run.

@Override
protected void run() throws Exception {
	ExecutorService executor = newSingleThreadExecutor();
	CompletableFuture<Void> future = eventloop.submit(() ->
			ChannelSupplier.ofPromise(client.download(REQUIRED_FILE))
					.streamTo(ChannelFileWriter.open(executor, clientStorage.resolve(DOWNLOADED_FILE)))
					.whenResult(() -> System.out.printf("%nFile '%s' successfully downloaded to '%s'%n%n",
							REQUIRED_FILE, clientStorage))
	);
	future.get();
	executor.shutdown();
}

See full source code on GitHub

Distributed Cluster Storage

With ActiveFS you can simply create distributed cluster file storage with high fault tolerance. We will use Docker to launch three virtual servers and one client. The storage will support file uploads with automatic repartitioning according to the provided rule and replication count.

The first thing we need to do is to create a launcher class ClusterTcpServerLauncher for our server. Extend SimpleTcpServerLauncher to get all the required instances: ActiveFsServer, local ActiveFS, AsyncHttpServer for GUI that will simplify working with your storage, and other helper instances. In the ClusterTcpServerLauncher we’ll only need to set up utils for repartitioning management like task schedulers, ClusterRepartitionController, and FsPartitions for tracking alive partitions and their statuses. The partitions will communicate via TCP protocol, while GUI server will use HTTP.

@Provides
@Eager
@Named("repartition")
EventloopTaskScheduler repartitionScheduler(Config config, ClusterRepartitionController controller) {
	return EventloopTaskScheduler.create(controller.getEventloop(), controller::repartition)
			.withInitializer(ofEventloopTaskScheduler(config.getChild("activefs.repartition")));
}

@Provides
@Eager
@Named("clusterDeadCheck")
EventloopTaskScheduler deadCheckScheduler(Config config, FsPartitions partitions) {
	return EventloopTaskScheduler.create(partitions.getEventloop(), partitions::checkDeadPartitions)
			.withInitializer(ofEventloopTaskScheduler(config.getChild("activefs.repartition.deadCheck")));
}

@Provides
ClusterRepartitionController repartitionController(Config config, ActiveFsServer localServer, FsPartitions partitions) {
	String localPartitionId = first(partitions.getAllPartitions());
	assert localPartitionId != null;

	return ClusterRepartitionController.create(localPartitionId, partitions)
			.withInitializer(ofClusterRepartitionController(config.getChild("activefs.repartition")));
}

@Provides
FsPartitions fsPartitions(Config config, Eventloop eventloop, @Optional ServerSelector serverSelector, ActiveFs fs) {
	Map<Object, ActiveFs> partitions = new LinkedHashMap<>();
	partitions.put(config.get("activefs.repartition.localPartitionId"), fs);

	return FsPartitions.create(eventloop, partitions)
			.withServerSelector(nullToDefault(serverSelector, RENDEZVOUS_HASH_SHARDER))
			.withInitializer(ofFsPartitions(config.getChild("activefs.cluster")));
}

Now we can move on to creating a client launcher ClusterTcpClientLauncher. We need to provide ClusterRepartitionController and a task scheduler to detect dead partitions. Similarly to the server launcher, we need to provide an AsyncHttpServer for GUI and FsPartitions for managing partitions. We also need an instance of ClusterActiveFs class, an ActiveFs implementation that operates on other partitions as a cluster and contains some redundancy and fail-safety capabilities.

@Provides
@Eager
@Named("clusterDeadCheck")
EventloopTaskScheduler deadCheckScheduler(Config config, FsPartitions partitions) {
	return EventloopTaskScheduler.create(partitions.getEventloop(), partitions::checkDeadPartitions)
			.withInitializer(ofEventloopTaskScheduler(config.getChild("activefs.repartition.deadCheck")));
}

@Provides
@Eager
AsyncHttpServer guiServer(Eventloop eventloop, AsyncServlet servlet, Config config) {
	return AsyncHttpServer.create(eventloop, servlet)
			.withInitializer(ofHttpServer(config.getChild("activefs.http.gui")));
}

@Provides
AsyncServlet guiServlet(ActiveFs activeFs) {
	return ActiveFsGuiServlet.create(activeFs, "Cluster FS Client");
}

@Provides
ActiveFs remoteActiveFs(Eventloop eventloop, FsPartitions partitions, Config config) {
	return ClusterActiveFs.create(partitions)
			.withInitializer(ofClusterActiveFs(config.getChild("activefs.cluster")));
}

@Provides
FsPartitions fsPartitions(Eventloop eventloop, Config config) {
	return FsPartitions.create(eventloop)
			.withInitializer(ofFsPartitions(config.getChild("activefs.cluster")));
}

Here’s the architecture of our distributed P2P storage:

You can create as many partitions as you wish, cluster client is optional as you can create an alternative client implementation that supports ActiveFS protocol.

To launch the example, run the following scripts to create Docker images and build containers (run all the scripts under activej/launchers/fs directory):

# building two images for server and client
docker build -t cluster-server -f ClusterServerDockerfile .
docker build -t cluster-client -f ClusterClientDockerfile .
# launching all the servers and client instances in background
docker-compose up -d

The containers will be built with the following configurations:

  • Server1: TCP-connection port 9001, HTTP GUI port 8081
  • Server2: TCP-connection port 9002, HTTP GUI port 8082
  • Server3: TCP-connection port 9003, HTTP GUI port 8083
  • Client: HTTP GUI port 8080

Use this script to manage containers:

# to stop a single container:
docker-compose stop server1
# to stop all the containers:
docker-compose down
# check containers status:
docker-compose ps

See full source code on GitHub