Building The Vault Manager for a Crypto Trading Platform

How to build a crypto exchange | #3. Vault Manager | It sends commands to each type of wallet supported and handles all the deposit events sent from the wallets.
Building The Vault Manager for a Crypto Trading Platform
Written by
Cosmin Harangus
Published on
October 1, 2018

So far we've looked into how to create an Ethereum Wallet, a Bitcoin Wallet and how to handle trades.

In this article we will learn how to build a vault manager for our crypto trading platform, so that we'll be able to:

  • create and manage user accounts and balances in order to support receiving deposits
  • generate addresses
  • update user balances
  • make withdrawals

Managing Wallets

The vault manager sends commands to each type of wallet we support, then it handles all the command replies and deposit events sent from the wallets.

Any crypto platform needs to support the following operations:

  • Generating deposit addresses for any supported chain;
  • Updating user balance for each coin when the user deposits any amount into the generated address;
  • Keeping balances locked until the deposits are completed;
  • Withdrawing from the user balance to an external address.

The vault manager will only interact with Apache Kafka, pass or listen to messages from the wallets, and update a database with the balances for each user so that we can use any programming language we want to build it in.

For this tutorial, we will use Golang.

Basic Setup of the Vault Manager

One aspect of the setup that was missing from the tranding engine article is how you set up your Golang application to be easily configurable either from a file or from environment variables. On top of that, we have to define different CLI commands to interact with it.

For this we will use two very important open source libraries from the Golang ecosystem.

You may have already used or heard about them before; and if you haven't, I suggest you get familiar with them in case you want to build modern Golang applications. I'm talking about:


Read their documentation to learn about their detailed usage but for now let's see how we can use them for our vault manager.

Below you have the configuration file used to define how we connect to the database and how we interact with Kafka.

.vault_manager.yml

server: 
  jwt_token_secret: notsosecretjwttokenpass
  monitoring: 
    enabled: false
    host: 0.0.0.0
    port: 6060

chains:
  eth: 
    tx: eth.wallet.transaction
    tx_confirmed: eth.wallet.transaction.confirmed
    command: eth.wallet.command
    command_reply: eth.wallet.command.reply
  btc: 
    tx: btc.wallet.transaction
    tx_confirmed: btc.wallet.transaction.confirmed
    command: btc.wallet.command
    command_reply: btc.wallet.command.reply

brokers:
  consumers:
    events: 
      name: vault_manager
      hosts: 
        - kafka:9092
      topics:
        - eth.wallet.transaction
        - eth.wallet.transaction.confirmed
        - eth.wallet.command.reply
        - btc.wallet.transaction
        - btc.wallet.transaction.confirmed
        - btc.wallet.command.reply
        - errors
  producers:
    commands: 
      hosts: 
        - kafka:9092
      topics: 
        - eth.wallet.command
        - btc.wallet.command

database:
  host: database
  username: vault
  password: vault_pass
  name: vault
  port: 5432 

In order to use this configuration file, we also need to create a structure that maps every item so that is can be accessed within the application.

config/config.go

package config

import (
	"log"

	"github.com/spf13/viper"
)

// Config structure
type Config struct {
	Brokers  BrokersConfig
	Server   ServerConfig
	Chains   map[string]ChainConfig
	Database DatabaseConfig
}

// ConsumerConfig structure
type ConsumerConfig struct {
	Name   string
	Hosts  []string
	Topics []string
}

// ProducerConfig structure
type ProducerConfig struct {
	Hosts  []string
	Topics []string
}

// ChainConfig structure
type ChainConfig struct {
	Tx           string
	TxConfirmed  string `mapstructure:"tx_confirmed"`
	Command      string
	CommandReply string `mapstructure:"command_reply"`
}

// BrokersConfig structure
type BrokersConfig struct {
	Consumers map[string]ConsumerConfig
	Producers map[string]ProducerConfig
}

// TopicConfig structure
type TopicConfig struct {
	Broker string
	Topic  string
}

// ServerConfig structure
type ServerConfig struct {
	JWTTokenSecret string `mapstructure:"jwt_token_secret"`
	Monitoring     MonitoringConfig
}

// MonitoringConfig structure
type MonitoringConfig struct {
	Enabled bool
	Host    string
	Port    string
}

// DatabaseConfig structure
type DatabaseConfig struct {
	Host     string
	Username string
	Password string
	Name     string
	Port     string
}

// LoadConfig Load server configuration from the yaml file
func LoadConfig(viperConf *viper.Viper) Config {
	var config Config

	err := viperConf.Unmarshal(&config)
	if err != nil {
		log.Fatalf("unable to decode into struct, %v", err)
	}
	return config
}

Now that we have the configuration in place, we also need to define a command to start our application.

Using Cobra, we defined two cli commands:

  • > ./vault_manager database - connects to the database and updates the schema to match our models.
  • > ./vault_manager start - connects to the database and to Kafka and listens for requests via the http API.

In order to connect to the database and set up our models, we used GORM as our ORM.

You can use anything you want for this, depending on your requirements.

The schema of the  vault manager for your crypto trading platform can get pretty complex based on the type of platform you want to build and the features it should support.

You could have clients that can register for an account and have an API key to create users, define wallets, generate addresses and define constraints or notifications on each of these entities based on the flow of coins and tokens in your system.

For now, we will only support the basic use cases of such a system. This includes the following:

  • Creating a new user/account
  • Generating an address for that account for a specific chain
  • Updating the balance of the address on a deposit
  • Withdrawing any available amount from an existing address

Database models

The Account model stores relevant data about a user.

models/account.go

package model

import (
	"github.com/jinzhu/gorm"
	uuid "github.com/nu7hatch/gouuid"
)

// Account structure
type Account struct {
	gorm.Model
	AccountID  string   `gorm:"unique; not null" json:"account_id"`
	Email      string   `json:"email"`
	Name       string   `json:"name"`
}

// NewAccount creates a new account for a merchant
func NewAccount(name string, email string) *Account {
	apiKey, _ := uuid.NewV4()
	return &Account{
		Name:       name,
		AccountID:  apiKey.String(),
		Email:      email,
	}
}

We will use the generated uuid to view balances and addresses for a user.

Generated addresses are saved using the address model below:

models/address.go

package model

import "github.com/jinzhu/gorm"

// Address structure
type Address struct {
	gorm.Model `json:"-"`
	PublicKey  string  `gorm:"unique" json:"public_key"`
	AcoountID  uint    `gorm:"type:bigint REFERENCES accounts(id)" json:"account_id"`
	Account    Account `json:"-"`
	Chain      string  `json:"chain"`
}

// NewAddress creates a new address for an account
func NewAddress(publicKey, chain string, accountID uint) *Address {
	return &Address{
		PublicKey: publicKey,
		Chain:   chain,
		AccountID:  accountID,
	}
}

Each interaction with an address - such as a deposit, a withdrawal, etc. - should be preserved in order to ensure we have a complete history of what happened with the funds from that address.

models/address_event.go

package model

import (
	"github.com/jinzhu/gorm"
)

// AddressEvent structure
type AddressEvent struct {
	gorm.Model
	TxID           string
	Type           string
	Symbol         string
	Amount         string
  Info           string
	CostAmount     string
	CostSymbol     string
	AddressID      sql.NullInt64 `gorm:"type:bigint REFERENCES addresses(id)" json:"address_id"`
	Address        Address       `json:"-"`
}

func NewAddressEvent(addressID uint, eventType, symbol, amount, txid, info, costAmount, costSymbol string) *AddressEvent {
	return &AddressEvent{
		AddressID:  addressID,
		Amount:     amount,
		Symbol:     symbol,
    Info:       info,
		TxID:       txid,
		Type:       eventType,
		CostAmount: costAmount,
		CostSymbol: costSymbol,
	}
}

Now that we have every event that can alter the balance for an address, we need somewhere to store the current value of the address based on the sum of these events.

For that, we will use the address balance model where we keep the current value of each token or coin deposited in an address.

models/address_balance.go

package model

import "github.com/jinzhu/gorm"

type AddressBalance struct {
	gorm.Model
	Balance       string
	LockedBalance string
	AddressID     uint `gorm:"type:bigint REFERENCES addresses(id)"`
	Address       Address
	LastEventID   uint `gorm:"type:bigint REFERENCES address_events(id)"`
	LastEvent     AddressEvent
	Chain         string
	Coin          string
}

// NewAddressBalance creates a new address balance for an address
func NewAddressBalance(coin, chain string, addressID, lastEventID uint, balance, lockedBalance string) *AddressBalance {
	return &AddressBalance{
		Coin:          coin,
    Chain:         chain,
		AddressID:     addressID,
		LastEventID:   lastEventID,
		Balance:       balance,
		LockedBalance: lockedBalance,
	}
}

Now that we have all the models, we can use the Gorm ORM to connect to the database and define a few basic methods to interact with each model.

We can also define custom methods for each model that can make querying easier.

models/repo.go

package model

import (
	"fmt"
	"log"

	"github.com/jinzhu/gorm"

	// import support for postgres for gorm
	_ "github.com/jinzhu/gorm/dialects/postgres"
)

// Repo structure
type Repo struct {
	Conn *gorm.DB
}

var repo *Repo

// NewRepo starts new connections
func NewRepo(host, port, user, pass, name string) *Repo {
	conn, err := gorm.Open("postgres", fmt.Sprintf("host=%s port=%s user=%s password=%s dbname=%s sslmode=disable", host, port, user, pass, name))
	if err != nil {
		panic("failed to connect database")
	}
	repo = &Repo{Conn: conn}
	return repo
}

// GetRepo retuns the existing repo
func GetRepo() *Repo {
	if repo == nil {
		panic("no active database connection")
	}
	return repo
}

// SetRepo updates the local active repo
func SetRepo(rep *Repo) {
	repo = rep
}

// Create a record
func (repo *Repo) Create(model interface{}) error {
	db := repo.Conn.Create(model)
	return db.Error
}

// FindAll record of a table
func (repo *Repo) FindAll(model interface{}, where string) error {
	db := repo.Conn.Find(model)
	return db.Error
}

// FindByID a record of a table
func (repo *Repo) FindByID(model interface{}, id uint) error {
	db := repo.Conn.Where("id = ?", id).Find(model)
	return db.Error
}

// Update a record of a table
func (repo *Repo) Update(model interface{}) error {
	db := repo.Conn.Save(model)
	return db.Error
}

// Delete a record from a table
func (repo *Repo) Delete(model interface{}, id uint) error {
	db := repo.Conn.Where("id = ?", id).Delete(model)
	return db.Error
}

Message Queue

In a previous article we connected to the Apache Kafka with a consumer and producer. You can use the same concepts to create an easier structure to work with; however, in order to keep things simple, we can leave that as an individual exercise.

In our project we have defined the following interfaces that we use to communicate with Apache Kafka:

net/kafka_types.go

package net

import "github.com/Shopify/sarama"

// KafkaProducer inferface
type KafkaProducer interface {
	Start() error
	Input() chan<- *sarama.ProducerMessage
	Errors() <-chan *sarama.ProducerError
	Close() error
}

// KafkaConsumer interface
type KafkaConsumer interface {
	Start() error
	GetMessageChan() <-chan *sarama.ConsumerMessage
	MarkOffset(msg *sarama.ConsumerMessage, meta string)
	ResetOffset(topic string, partition int32, offset int64, meta string) error
	Close() error
}

In order to help us interact with the Ethereum Wallet Manager through Kafka, it would also be indicated to implement a structure similar to the one below that can send commands to the wallets or process incoming transactions.

queue/types.go

package queue

// Queue executes commands on the wallets and can wait for replies from them
type Queue interface {
	Listen()
	Execute(command Command) error
	ExecuteAndWait(command Command) (chan map[string]interface{}, error)
	SetTransactionHandler(func(*Transaction) error)
	SetConfirmationHandler(func(*Transaction) error)
}

// Command structure
type Command struct {
	CommandTopic string                 `json:"-"`
	Command      string                 `json:"command"`
	Data         map[string]interface{} `json:"data"`
	Meta         map[string]interface{} `json:"meta"`
	ReplyTopic   string                 `json:"reply_topic,omitempty"`
}

// Transaction structure
type Transaction struct {
	Symbol string
	TxID   string
	Value  string
	To     string
}

Your connection to the message queue then can be outlined in three main points:

  1. Define a structure that knows how to handle incoming transactions, confirmations or command replies.
  2. Listen to every *.transaction*, *.transaction.confirmed*, *.command.reply*  or errors topics.
  3. Process each type of message received.

queue/index.go

// define the queue structure
type queue struct {
	config             map[string]config.ChainConfig
	producer           net.KafkaProducer
	consumer           net.KafkaConsumer
	pending            map[string]chan map[string]interface{}
	transactionHandler func(*Transaction) error
	confirmTransaction func(*Transaction) error
}

// NewQueue constructor
func NewQueue(producer net.KafkaProducer, consumer net.KafkaConsumer, cfg map[string]config.ChainConfig) Queue {
	return &queue{
		producer: producer,
		consumer: consumer,
		config:   cfg,
		pending:  make(map[string]chan map[string]interface{}),
	}
}

// set a handler for any incomming transaction trasaction
func (q *queue) SetTransactionHandler(transactionHandler func(*Transaction) error) {
	q.transactionHandler = transactionHandler
}

// if the message comes from a ""<chain_symbol>.transaction" topic then 
// create a new transaction object from it and pass it to the handler
func (q *queue) ProcessTransaction(symbol string, msg *sarama.ConsumerMessage) {
	data := make(map[string]interface{})
	err := json.Unmarshal(msg.Value, &data)
	if err != nil {
		log.Println("Error parsing transaction", err, msg)
		return
	}

	transaction := &Transaction{
		Symbol: data["symbol"].(string),
		TxID:   data["txid"].(string),
		Value:  data["value"].(string),
		To:     data["to"].(string),
	}
	go q.transactionHandler(transaction)
}

// other methods from the interface
// Listen()
// Execute(command Command) error
// ExecuteAndWait(command Command) (chan map[string]interface{}, error)
// ...

Now that we have incoming transactions and we can execute commands on the wallets, we can start building the API and handling the incoming messages.

API Endpoints

We start off with a simple HTTP server written with the help of Gin. The server looks like this:

server/http.go

package server

import (
	cgf "vault_manager/config"
	"vault_manager/net"
	"vault_manager/queue"
	"vault_manager/model"
	"github.com/gin-gonic/gin"
)

// Server interface
type Server interface {
	Listen()
}

// server structure
type server struct {
	config    cgf.Config
	queue     queue.Queue
  repo      *model.Repo
	consumers map[string]net.KafkaConsumer
	producers map[string]net.KafkaProducer
}

// NewServer creates a new server instance that can process the requests
func NewServer(config cgf.Config) Server {
	// init producers
	producers := make(map[string]net.KafkaProducer, len(config.Brokers.Producers))
	for key, brokerCfg := range config.Brokers.Producers {
		producers[key] = NewProducer(brokerCfg)
	}

	// init consumers
	consumers := make(map[string]net.KafkaConsumer, len(config.Brokers.Consumers))
	for key, brokerCfg := range config.Brokers.Consumers {
		consumers[key] = NewConsumer(brokerCfg)
	}
  
  // setup database connection
  dbConf := config.Database
  repo := model.NewRepo(dbConf.Host, dbConf.Port, dbConf.Username, dbConf.Password, dbConf.Name)

  // connect the queue to the right producer and consumer
	q := queue.NewQueue(producers["commands"], consumers["events"], config.Chains)

  // return the server structure
	return &server{
		config:    config,
		queue:     q,
    repo:      repo,
		producers: producers,
		consumers: consumers,
	}
}

// Listen to requests from the API
func (srv *server) ListenToRequests() {
	r := gin.Default()
  srv.queue.SetTransactionHandler(func(transaction *queue.Transaction) error {
		return srv.HandleDepositTransaction(transaction.Symbol, transaction.TxID, transaction.Value, transaction.To)
	})

	// I should be able to add a new address for an existing chain
	r.POST("/address", srv.CreateAddress)
	r.GET("/address", srv.ListAddresses)
	r.GET("/address/:address_id/balance", srv.GetAddressBalances)

	// I should be able to withdraw funds from a generated address
	r.POST("/withdraw/address/:address_id", srv.Withdraw)

	r.Run() // listen and serve on 0.0.0.0:80
}

In order to have our API complete, we start implementing what the CreateAddress, ListAddresses, GetAddressBalances and Withdraw methods do.

Let's look at each of them individually and start with something simple like the ListAddresses. For this action, we only need to list what we have in the database.

server/address.go

package server

import (
	"errors"
	"time"
	"vault_manager/model"
	"vault_manager/queue"

	"github.com/gin-gonic/gin"
	"github.com/nu7hatch/gouuid"
)

// ListAddresses list the available addresses for the user
func (srv *server) ListAddresses(c *gin.Context) {
  // get the account by the account id from the x-api-key header
	account, err := model.GetAccountByAPIKey(c.GetHeader("x-api-key")) // you can define this in the model package
  // abort if no api key provided or account is not found
	if err != nil || account.ID == 0 {
		c.AbortWithError(401, errors.New("Unauthorized"))
		return
	}
  // query the database for any address that belongs to the account
  addresss := make([]model.Address, 0, 0)
  db := srv.repo.Conn.Where("account_id = ?", account.ID).Find(&addresss)
  // abort with error if the query failed
	if db.Error != nil {
		c.AbortWithError(500, err)
		return
	}
  // return the results
	c.JSON(200, addresses)
}

Generating deposit addresses is relatively simple as well, but we need to also send a request through the message queue to the appropriate wallet manager.

server/address.go

// CreateAddress creates a new deposit address for the user
func (srv *server) CreateAddress(c *gin.Context) {
	// get the account by the account id from the x-api-key header
	account, err := model.GetAccountByAPIKey(c.GetHeader("x-api-key")) // you can define this in the model package
  // abort if no api key provided or account is not found
	if err != nil || account.ID == 0 {
		c.AbortWithError(401, errors.New("Unauthorized"))
		return
	}
  symbol := c.PostForm("chain_symbol")
  // send the request to the wallet through the message queue and return a reply channel where we can wait for a response
	replyChan, _ := srv.queue.ExecuteAndWait(queue.Command{
		Command:      "create_account",
		CommandTopic: symbol + ".wallet.command",
		Data: map[string]interface{}{
			"symbol": symbol,
		},
		Meta: map[string]interface{}{
			"account_id": account.ID,
		},
	})

  // wait for the response from the wallet or reply to the caller with a timeout error
	select {
	case data := <-replyChan:
    // on a success reply from the wallet save the new address in the database for the account
		publicKey, _ := data["address"].(string)
		address, err := model.CreateAddress(publicKey, symbol, account.ID) // you can define this in the model package
		if err != nil {
			c.AbortWithError(500, err)
			return
		}
    // reply with the new generated address
		c.JSON(201, address)
	case <-time.After(time.Duration(30) * time.Second):
		// timeout
		c.AbortWithError(500, errors.New("Request timeout"))
		return
	}
}

Displaying the balance available for each address involves adding the GetAddressBalances method, querying the database, and displaying the information in an easy-to-process format.

But in order to have some data in the address_balances table we also need to start processing the events coming from the blockchain, you have to add records in the address_events table and update the balances based on these events.

Listing the balances looks like this:

server/account.go

// GetAddressBalances action
func (srv *server) GetAddressBalances(c *gin.Context) {
	// get the account by the account id from the x-api-key header
	account, err := model.GetAccountByAPIKey(c.GetHeader("x-api-key")) // you can define this in the model package
  // abort if no api key provided or account is not found
	if err != nil || account.ID == 0 {
		c.AbortWithError(401, errors.New("Unauthorized"))
		return
	}
  addressID := c.PostForm("address_id")
  // load balances from the database
  addressBalances := make([]model.AddressBalance, 0, 0)
	db := srv.repo.Conn.Where("address_id = ?", addressID).Find(&addressBalances)
	if db.Error != nil {
		c.AbortWithError(500, db.Error)
		return
	}
  // display them in an easy to manage format
	balances := make(map[string]interface{}, 0)
	for _, balance := range addressBalances {
		balances[balance.Coin] = map[string]interface{}{
			"balance":       balance.Balance,
			"lockedBalance": balance.LockedBalance,
		}
	}
	c.JSON(200, balances)
}

Back when we defined the HTTP server, we also added a handler for transactions under the HandleDepositTransaction(symbol, txis, amount, to string) method.

We can now implement it and save the events in the database.

server/account.go

func (srv *server) HandleDepositTransaction(symbol, txid, amount, to string) error {
	address, err := model.GetAddressByPublicKey(to) // @todo add method in the model
	if err != nil {
		return err
	}

  // add address event
  addressEvent := model.NewAddressEvent(address.ID, "deposit", symbol, amount, txid, "", "0", symbol)
	if err := srv.repo.Conn.Create(&addressEvent).Error; err != nil {
		return err
	}

  // reflect the latest event on the balance
  // this can either be done here or in a separate section in order to make the 
  // event generation and the balance update completely independent
  srv.repo.ApplyEventOnBalance(addressEvent)

	return nil
}

Here we've processed a new transaction event from Kafka and saved it in the database. From there, we need to update the balance of the user to reflect the new deposit.

In a live production vault manager, this step is a bit more complex since we first add a depositing event when the transaction is first processed on the blockchain and a deposited event after 120 blocks have passed and we can consider the transaction final.

The rest of the code is very similar to the above. Now let's look at how to apply the new event on the user balance.

model/repo_address_balance.go

import (
	"math/big"
	"vault_manager/model"
)

func (repo *Repo) ApplyEventOnBalance(addressEvent *model.AddressEvent) error {
	// check if an address balance exists for the symbol and add it if it does not
  addressBalance := model.AddressBalance{}
  exists := tx.Where("address_id = ? and coin = ?", addressEvent.AddressID, addressEvent.Symbol).Find(&addressBalance).RecordNotFound()
  if !exists {
    addressBalance := model.NewAddressBalance(symbol, address.ID, addressEvent.ID, "0", "0")
    err := repo.Conn.Create(&addressBalance).Error
    if err != nil {
      return err
    } 
  }
  
	addressBalance.LastEventID = addressEvent.ID

	switch addressEvent.Type {
	case "deposit":
    // apply the deposit event on the address balance
    balance := ConvertStringToBigFloat(addressBalance.Balance)
	  amount := ConvertStringToBigFloat(addressEvent.Amount)
	  balance.Add(balance, amount)

	  addressBalance.Balance = balance.String()
	  return repo.Conn.Save(&addressBalance).Error
    
	case "withdraw_request":
    // apply a withdraw request on the balance and lock the funds
    lockedBalance := ConvertStringToBigFloat(addressBalance.LockedBalance)
    balance := ConvertStringToBigFloat(addressBalance.Balance)
    amount := ConvertStringToBigFloat(addressEvent.Amount)
    lockedBalance.Add(lockedBalance, amount)
    balance.Sub(balance, amount)

    addressBalance.LockedBalance = lockedBalance.String()
    addressBalance.Balance = balance.String()
		return repo.Conn.Save(&addressBalance).Error
    
	case "withdraw":
    // complete the withdraw request and remove the event amount from the locked balance
		lockedBalance := ConvertStringToBigFloat(addressBalance.LockedBalance)
    amount := ConvertStringToBigFloat(addressEvent.Amount)
    lockedBalance.Sub(lockedBalance, amount)

    addressBalance.LockedBalance = lockedBalance.String()
    return tx.Save(&addressBalance).Error
	}
	return nil
}

func ConvertStringToBigFloat(num string) *big.Float {
	value, valid := new(big.Float).SetString(num)
	if !valid {
		return new(big.Float)
	}
	return value
}

The ApplyEventOnBalance method is very simple. It loads or creates a balance for the address and symbol pair of the address event and based on the event type it adds or subtracts from the balance or from the locked balance field (depending on the case).

Since we aim to keep things simple in this tutorial, we won't process deposits in two steps so the deposit amount is added directly to the Balance field. However, for the withdraw we illustrate how the process should work in a finished product.

When the withdraw request is initialised, the amount of the event is moved from the available balance to the locked balance field, then when the withdraw request is completed successfully, the funds are cleared from the locked balance since there should be a successful blockchain transaction at this point.

For deposits with confirmation, it should work in a similar way, but in reverse.

On a new blockchain transaction, the funds are added to the locked balance amount and when the transaction is confirmed the funds are moved from LockedBalance to Balance so that the user can interact with them.

The last action we're adding in order to have a complete vault manager is the withdraw action, which is defined below.

server/withdraw.go

func (srv *server) Withdraw(c *gin.Context) {
	// get the account by the account id from the x-api-key header
	account, err := model.GetAccountByAPIKey(c.GetHeader("x-api-key")) // you can define this in the model package
  // abort if no api key provided or account is not found
	if err != nil || account.ID == 0 {
		c.AbortWithError(401, errors.New("Unauthorized"))
		return
	}
  addressID := c.PostForm("address_id") // the address from which to withdraw the funds

  address, err := model.GetAddressByID(addressID) // @todo add method in the model
	if err != nil {
    c.AbortWithStatusJSON(404, map[string]interface{}{
			"success": false,
      "error":   err.Error(),
		})
		return
	}
  if (address.AccountID != account.ID) {
    c.AbortWithStatusJSON(404, map[string]interface{}{
			"success": false,
			"error":   "Address not found",
		})
		return 
  }
  
  amount := c.PostForm("amount") // the amount of coins to withdraw
	symbol := c.PostForm("symbol") // the type of coin you want to withdraw
  to := c.PostForm("to")         // the address to withdraw funds to
	
  // create the withdraw events and send it to kafka for processing and wait for a transaction id
	addressEvent, err := srv.processWithdraw(address, amount, symbol, to)
  // send response to the user based on the address event
	if err != nil {
		c.AbortWithStatusJSON(500, map[string]interface{}{
			"success": false,
			"error":   err.Error(),
			"txid":    addressEvent.TxID,
		})
		return
	}
	c.JSON(200, map[string]interface{}{
		"success": true,
		"txid":    addressEvent.TxID,
	})
}

// process withdraw request and return the created AddressEvent with the transaction id and an error
func (srv *server) processWithdraw(address *model.Address, amount, symbol, to string) (*model.AddressEvent, error) {
	var event model.AddressEvent
	event.Symbol = symbol
	event.AddressID = address.ID
	event.Type = "withdraw_request"

	// begin a transaction
	tx := s.repo.Conn.Begin()
  
  // check if the current balance has enough funds to execute the withdraw
  var addressBalance model.AddressBalance
	db := srv.repo.Conn.Where("address_id = ? and coin = ?", addressID, symbol).Find(&addressBalance)
  
	balance := s.ConvertStringToBigFloat(addressBalance.Balance)
	requestedAmount := s.ConvertStringToBigFloat(amount)

	if balance.Cmp(requestedAmount) < 0 {
		tx.Rollback()
		return event, errors.New("Insufficient funds available")
	}

	// create the withdraw_request event for the account address
  addressEvent := model.NewAddressEvent(address.ID, "withdraw_request", symbol, amount, "", "", "0", symbol)
	if err := srv.repo.Conn.Create(&addressEvent).Error; err != nil {
		tx.Rollback()
    return event, err
	}
  srv.repo.ApplyEventOnBalance(addressEvent)
  
	// send a withdraw request to the ethereum wallet via kafka producer 
  // and wait for a response message with the transaction id
	txid, err := srv.SendWithdraw(address.PublicKey, to, symbol, amount)
	event.TxID = txid
	if err != nil {
		tx.Rollback()
		log.Println("Unable to execute withdrawal request. Failed from wallet with", err)
		return event, err
	}
	commitErr := tx.Commit().Error
	if commitErr != nil {
		log.Println("Unable to commit withdrawal request after payment was sent. Database connection issue? Transaction [", symbol, "] ", txid, commitErr)
		return event, commitErr
	}

	//in case of successful withdrawal request
	//create the withdraw event for the account address
  withdrawEvent := model.NewAddressEvent(address.ID, "withdraw", symbol, amount, txid, "", "0", symbol)
	if err := srv.repo.Conn.Create(&withdrawEvent).Error; err != nil {
    log.Println("Error adding the withdraw event for the account wallet", err)
    return event, err
	}
  srv.repo.ApplyEventOnBalance(withdrawEvent)

	return event, nil
}

For withdraw we need to check if the user has enough funds to make the withdraw, otherwise the request will fail on the blockchain.

The processWithdraw method above handles most of the business logic for this.

First, it checks the balance and adds the withdraw_request event we discussed earlier before. Then, it sends the request using the SendWithdraw method through Kafka to the Ethereum Wallet Manager for processing.


The last step is checking if we receive back a transaction id or an error and add the
event in case of a success.

After each step we also update the balance in order to reflect the changes in the user account.

This tutorial is by no means complete but I hope it gives you enough insight into how to build a vault manager for your application.

Depending on your use case, you can develop the vault manager discussed here to support other wallets by:

  • extending the configuration file
  • improving the schema and the project architecture
  • implementing the missing functions that would have made this long article even longer
  • adding support for tokens which present their own set of challenges

The concepts and implementation of this tutorial were taken from one of our in-house blockchain components, simplified and adapted to illustrate the concepts and possible implementation of a vault manager.


I hope you found it inspiring and helpful in building your own implementations for a crypto trading platform.

If you need any help, we are always happy to connect and help in any capacity.

Weekly newsletter
No spam. Just the latest releases and tips, interesting articles, and exclusive interviews in your inbox every week.
Read about our privacy policy.
Thank you! Your submission has been received!
Oops! Something went wrong while submitting the form.