This can be a cryptocurrency exchange, a payment gateway, atomic swaps or many other use cases that have similar components at their core.
That said, I plan on covering the following topics:
The first topic we are going to cover is setting up an Ethereum Wallet Manager.
This service is very important because it allows our crypto trading platform to support multiple use cases, such as:
Before we proceed, we first need to make sure we meet our environment requirements:
The rest of the requirements come in the form of Docker images and we don't need to install anything else in order to use them, except write a simple Docker Compose configuration file like this one:
docker-compose.yml
version: '3'
services:
ganache:
image: trufflesuite/ganache-cli
command: -m
redis:
image: redis:alpine
ports:
- "6379:6379"
command: redis-server --appendonly yes
volumes:
- redis:/data
zookeeper:
image: wurstmeister/zookeeper
ports:
- "2181:2181"
kafka:
image: wurstmeister/kafka
ports:
- "9092:9092"
environment:
KAFKA_ADVERTISED_HOST_NAME: 127.0.0.1
KAFKA_CREATE_TOPICS: "command:1:1,address.created:1:1,transaction:1:1,errors:1:1"
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
volumes:
redis:
You can start these services by simply running docker-compose up -d
, which will download the images from the Docker Registry and start each container with the environment variables defined in the compose file above.
Let's see what we are using here and what each component does.
You can't have a wallet manager without a node connected to the Ethereum blockchain. Since we don't want to download the entire blockchain and work with real Ether, we use Ganache (formally TestRpc).
This will allow us to work faster since the blocks are mined instantly by the service. On production you will want to use something like Geth instead, to connect to the main Ethereum network.
We need some sort of database for storing the addreses we create and watch and for keeping track of transactions that have been processed.Redis is a great in-memory key/value database for this use case.
In this tutorial we will use the same Redis database to store the private keys that were generated for the addresses, but on a production server you should probably use something like Vault instead.
Apache Kafka will play a central role in our infrastructure as it will handle receiving messages from all services we build and then distribute them to other connected nodes.
For the Ethereum wallet manager service we will communicate through these topics:
The Apache Kafka server can be scaled independently and provides a distributed cluster for handling messages between services.
Personally, I am very fond of Elixir because you can build extremely reliable distributed applications that are very easy to understand and imply minimal effort, but when it comes to Ethereum the package ecosystem is not there yet.
For Ethereum development the best move would be using Node.js/Javascript. You get a lot more ready-to-use components out of the box, so for those reasons our manager will be a Node.js appplication.
The first 3 dependencies are easy to understand. We need:
web3
to connect to the Ganache/Ethereum node via websocketsredis
to connect to the Redis server to store/retrieve datakafka-node
to connect to Zookeeper, get a Kafka broker/server endpoint, and connect to it to produce or consume messages.The last 2 are there to help us make the code easier to understand and take advantage of async/await when interacting with the Redis server.
Next, we use these packages to connect to the Redis, Ethereum and Kafka servers.
Connecting to Redis is easy, just define a redis.js
file and write something similar to this code:
// load configuration
const config = require('../../config')
const redis = require('redis')
const bluebird = require('bluebird')
// promisify the redis client using bluebird
bluebird.promisifyAll(redis.RedisClient.prototype);
bluebird.promisifyAll(redis.Multi.prototype);
// create a new redis client and connect to the redis instance
const client = redis.createClient(config.redis_port, config.redis_host);
// if an error occurs, print it to the console
client.on('error', function (err) {
console.error("[REDIS] Error encountered", err)
})
module.exports = client;
If you think connecting to Redis was easy, then connecting to the Ethereum node using web3 will blow your mind. Add this into an ethereum.js
file:
const config = require('../../config')
const Web3 = require('web3')
module.exports = new Web3(config.uri)
Kafka, on the other hand, needs to consume or produce messages from the queue so we also need to define how these are configured.
Add a new queue.js
file like this:
const kafka = require('kafka-node')
const config = require('../../config')
// configure how the consumers should connect to the broker/servers
// each consumer creates his own connecto to a broker
const default_options = {
host: config.kafka_zookeeper_uri,
autoCommit: true,
fromOffset: 'earliest',
}
module.exports.consumer = (group_id = "ethereum_wallet_manager_consumer", topics = [], opts = {}) => {
const options = Object.assign({ groupId: group_id }, default_options, opts)
const consumer = new kafka.ConsumerGroup(options, topics)
return consumer
}
// configure how the producer connects to the Apache Kafka broker
// initiate the connection to the kafka client
const client = new kafka.Client(config.kafka_zookeeper_uri, config.kafka_client_id)
module.exports.client = client
const producer = new kafka.Producer(client)
// add a listener to the ready event
async function on_ready(cb) {
producer.on('ready', cb)
}
// define a method to send multiple messages to the given topic
// this will return a promise that will resolve with the response from Kafka
// messages are converted to JSON strings before they are added in the queue
async function send(topic, messages) {
return new Promise((resolve, reject) => {
// convert objects to JSON strings
messages = messages.map(JSON.stringify)
// add the messages to the given topic
producer.send([{ topic, messages}], function (err, data) {
if (err) return reject(err)
resolve(data)
})
})
}
// expose only these methods to the rest of the application and abstract away
// the implementation of the producer to easily change it later
module.exports.on_ready = on_ready
module.exports.send = send
We can now start to tackle each functionality on its own and explain the implications of each.
Exchanges and payment gateways need to generate a new address for their clients when they want to start depositing funds to the service or pay for some product.
Generating a new unused Ethereum address is a basic requirement for any cryptocurrency service so let's see what we can do to get this out of the way.
First, we create a commands.js
file where we subscribe to commands from the message queue.
The goal is to have the following process:
commands
` topic and listen for new `create_account` commands.create_account
` command arrives, create the new private/public keys and persist them in a vaultaccount_created
` message and send it back to the queue under the `account_created` topic.const web3 = require("./ethereum")
const redis = require('./redis')
const queue = require('./queue')
/**
* Listen to new commands from the queue
*/
async function listen_to_commands() {
const queue_consumer = queue.consumer('eth.wallet.manager.commands', ['command'])
// process messages
queue_consumer.on('message', async function (topic_message) {
try {
const message = JSON.parse(topic_message.value)
// create the new address with some reply metadata to match the response to the request
const resp = await create_address(message.meta)
// if successful then post the response to the queue
if (resp) {
await queue_producer.send('address.created', [resp])
}
} catch (err) {
// in case something goes wrong catch the error and send it back in the 'errors' topic
console.error(topic_message, err)
queue_producer.send('errors', [{type: 'command', request: topic_message, error_code: err.code, error_message: err.message, error_stack: err.stack}])
}
})
return queue_consumer
}
/**
* Create a new ethereum address and return the address
*/
async function create_account(meta = {}) {
// generate the address
const account = await web3.eth.accounts.create()
// disable checksum when storing the address
const address = account.address.toLowerCase()
// save the public address in Redis without any transactions received yet
await redis.setAsync(`eth:address:public:${address}`, JSON.stringify({}))
// Store the private key in a vault.
// For demo purposes we use the same Redis instance, but this should be changed in production
await redis.setAsync(`eth:address:private:${address}`, account.privateKey)
return Object.assign({}, meta, {address: account.address})
}
module.exports.listen_to_commands = listen_to_commands
To use this, simply call the listen_to_commands()
method from this file and send a message to the queue under the command
topic.
Our wallet is not yet complete - we still need to be notified when any address we generated has received deposits.
For this, the Ethereum web3 client provides the newBlockHeaders
subscription.
If the service breaks down, some blocks could get lost so we need to check if we have to sync our wallet to the latest block of the network.
Create a sync_blocks.js
file and add the following code:
const web3 = require('./ethereum')
/**
* Sync blocks and start listening for new blocks
* @param {Number} current_block_number - The last block processed
* @param {Object} opts - A list of options with callbacks for events
*/
async function sync_blocks(current_block_number, opts) {
// first sync the wallet to the latest block
let latest_block_number = await web3.eth.getBlockNumber()
let synced_block_number = await sync_to_block(current_block_number, latest_block_number, opts)
// subscribe to new blocks
web3.eth.subscribe('newBlockHeaders', (error, result) => error && console.log(error))
.on("data", async function(blockHeader) {
return await process_block(blockHeader.number, opts)
})
return synced_block_number
}
// Load all data about the given block and call the callbacks if defined
async function process_block(block_hash_or_id, opts) {
// load block information by id or hash
const block = await web3.eth.getBlock(block_hash_or_id, true)
// call the onTransactions callback if defined
opts.onTransactions ? opts.onTransactions(block.transactions) : null;
// call the onBlock callback if defined
opts.onBlock ? opts.onBlock(block_hash_or_id) : null;
return block
}
// Traverse all unprocessed blocks between the current index and the lastest block number
async function sync_to_block(index, latest, opts) {
if (index >= latest) {
return index;
}
await process_block(index + 1, opts)
return await sync_to_block(index + 1, latest, opts)
}
module.exports = sync_blocks
Here we only use the Ethereum connection to pass through every block from our current block number to the latest block number of the blockchain.
Once we've synced to the latest block number, we start subscribing to new blocks. For each block, we execute one of the following callback functions in order to process the new block and the list of transactions in it:
In order to use this, we need to define what we want to do for each of these events.
Usually, you'd want to follow this process:
These points are addressed in this final code sample:
const web3 = require("web3")
const redis = require('./redis')
const queue = require('./queue')
const sync_blocks = require('./sync_blocks')
/**
* Start syncing blocks and listen for new transactions on the blockchain
*/
async function start_syncing_blocks() {
// start from the last block number processed or 0 (you can use the current block before deploying for the first time)
let last_block_number = await redis.getAsync('eth:last-block')
last_block_number = last_block_number || 0
// start syncing blocks
sync_blocks(last_block_number, {
// for every new block update the latest block value in redis
onBlock: update_block_head,
// for new transactions check each transaction and see if it's new
onTransactions: async (transactions) => {
for (let i in transactions) {
await process_transaction(transactions[i])
}
}
})
}
// save the lastest block on redis
async function update_block_head(head) {
return await redis.setAsync('eth:last-block', head)
}
// process a new transaction
async function process_transaction(transaction) {
const address = transaction.to.toLowerCase()
const amount_in_ether = web3.utils.fromWei(transaction.value)
// check if the receiving address has been generated by our wallet
const watched_address = await redis.existsAsync(`eth:address:public:${address}`)
if (watched_address !== 1) {
return false
}
// then check if it's a new transaction that should be taken into account
const transaction_exists = await redis.existsAsync(`eth:address:public:${address}`)
if (transaction_exists === 1) {
return false
}
// update the list of transactions for that address
const data = await redis.getAsync(`eth:address:public:${address}`)
let addr_data = JSON.parse(data)
addr_data[transaction.hash] = {
value: amount_in_ether
}
await redis.setAsync(`eth:address:public:${address}`, JSON.stringify(addr_data))
await redis.setAsync(`eth:transaction:${transaction.hash}`, transaction)
// move funds to the cold wallet address
// const cold_txid = await move_to_cold_storage(address, amount_in_ether)
// send notification to the kafka server
await queue_producer.send('transaction', [{
txid: transaction.hash,
value: amount_in_ether,
to: transaction.to,
from: transaction.from,
//cold_txid: cold_txid,
}])
return true
}
module.exports = start_syncing_blocks
And that would be it for processing new transactions on Ethereum and completing this first step towards setting up a crypto trading platform.
This service can be further improved with:
Adding the features above (and many others) can be a good exercise for you to make until we release the next piece in the series.
I hope this article has been useful and you now better understand how services like crypto exchanges and crypto payment gateways work.
At Around25 we've been developing such services and components for some time now (actually, as soon as we learned about blockchain technology), so we can say we're quite ready to share some know-how with those of you who want to move faster when building a crypto trading platform or application.
In fact, here's one of my colleagues, Darius, with a tutorial on how to build a private Ethereum blockchain from scratch (Part 1 - Geth).