gRPC with Go - Bidirectional Streams

gRPC with Go Crash Course – Bidirectional Streams

This article is part of the series gRPC With Go Crash Course

In this part of the course, you will implement the most challenging RPC routine type which gRPC has to offer – bidirectional streams.

These routines are used to achieve bidirectional client-server communication – meaning that communication can flow both ways. A way to achieve this using traditional HTTP technologies is via web sockets. With gRPC, you have support for such communication natively.

The Starting Point

There are two versions of this exercise – an easier one and a harder one.

The easier exercise will have some code written for you out-of-the-box. The code involves some Go multithreading. If you are not comfortable with goroutines & channels in Go, then I’d suggest you take the easier route.

However, if you want a greater challenge, then I’d suggest taking the harder route, where you’ll have to write out all the logic yourself.

If you completed the previous exercise, you can continue from where you left off. However, in that case you’ll have to go with the harder version of the exercise. 

If you don’t want to go through this hassle, simply clone the easy version branch as shown below.

To get started, run this command to clone the necessary exercise materials in a convenient folder.

For the easier version of the exercise:

git clone --branch v5-bidirectional-streams-easy https://github.com/preslavmihaylov/go-grpc-crash-course

For the harder version:

git clone --branch v5-bidirectional-streams-hard https://github.com/preslavmihaylov/go-grpc-crash-course


NOTE: If you are using GOPATH mode, you should clone the repo exactly in <your-gopath>/src/github.com/preslavmihaylov/go-grpc-crash-course.


Next up, install the libraries you’ll need by executing this at the heart of the repo:

go mod download

This line will install the required dependencies in your $GOPATH.

Your Goal

In this final exercise, we’ll explore the most complex type of communication gRPC enables – bidirectional streams:

First, take a look around the scaffold and familiarize yourself with what you’re building.

You have three main folders – casino, client, payment_statements

  • casino – this contains the source code for a casino microservice. This will contain the bulk of the backend logic.
  • client – this is a client application, which connects to the casino service and allows the user to use the application via a CLI interface.
  • payment_statements – this contains the source code for a payment statements microservice. It will be solely responsible for generating a user’s payment statement given a list of payments. The casino service will connect to this one and create user payment statements upon request.

All the service schema are in the idl directory. This is where you’ll spend most of your time throughout this exercise.

Finally, you have make_protos.sh, which is a simple bash script which compiles all your protobuf schemas together.

This is a utility script, enabling you to more easily re-compile proto schemas, instead of having to dig in your bash history for the correct protoc command.

The IDLs & initial service integrations are already setup for you. Additionally, most RPC routines (BuyTokens, Withdraw, TokenBalance, GetPayments, GetPaymentStatement) are already implemented.

These were done in the previous exercises.

What we have left is to implement the final (and hardest) streaming RPC in the casino server – Gamble. You also have to write some code in the client to interact with that routine. 

For all the locations where you have to chime in, you have a TODO: Implement comment or panic in there.

Take a look around and see where all those locations are.

In this exercise, we’ll be focusing on implementing a bidirectional stream RPC.. This is a native gRPC mechanism to achieve bidirectional client-server communication.

If you want to attempt the exercise on your own, read on. Otherwise skip to the Full Walkthrough section.

What the routine has to do is maintain a gambling session open between client and server. The server will periodically send stock information to the client. 

The stock information contains the stock price (in tokens) of some stock – called AwesomeStock. The initial stock price is 10 (already implemented).

The user (client app) will see the stock information on their screen and they will be able to either buy or sell stocks and either use or receive tokens in their “wallet”. 

Whenever the server receives a user action, it will validate that the user can buy/sell stocks and change their tokens & stock balances (casinoServer.userToTokens and casinoServer.userToStocks).

This data is stored on the server-side.

If everything is completed well, this is what the final result should look like after you run all the binaries:

final application demo

NOTE: I’ve increased the stock info propagation interval to avoid making the gif excessively large!

I’m also using a smaller stock price than what you’d have as this gif is from an earlier version of the workshop.

Requirements (hard version)

Start by creating several helper functions to aid you with the concurrency-related procedures.

The first function you should create should look like this (in client/commands.go):

In this function, make an infinite for loop, which waits for something to come from the error channel (use a select).

If there isn’t any error, invoke handler(stream). If the handler returns an error, forward it through the error channel and exit the function.

Next, create two function with the same signature but different names, which will be passed in as handlers to the function you just wrote.

Finally, use all the functions you just wrote in the gamble function you’ve already got:

  • Create a new client stream via client.Gamble(context.Context)
    • Don’t forget to defer stream.CloseSend()
  • Create a buffered error channel with size 2
  • Concurrently start iterateStreamWithHandler twice and pass the necessary parameters
    • Use fetchGamblingInfo and sendUserGamblingAction as the final parameters
  • Finally, wait for any error on the error channel
    • If it’s nil, errStopGambling or io.EOF, return “finished gambling!” with no error
    • Otherwise, return no message and an error with message
      “gambling unexpectedly stopped: {error}”

After you do all this, proceed with the next section.

Requirements (easy version)

This section covers the requirements for the easy version of the application. It assumes you already have implemented the mechanisms covered in the previous section or that you’ve cloned the easy branch for this part of the course.

Implement stock price monitoring

In casino/main.go:

  • Periodically update stock price & send new stock info to client –
    The incrementAndSendStockPrice function
    • On every 10s (just use time.Sleep), you should randomly change the stock price (An int – casinoServer.stockPrice).
      • E.g. increment by a random value in range [1, 15) and decrement by a random value in range [1, 15)
    • Finally, send the stock price via the stream to the client. Use the structure casinopb.GambleInfo and stream.Send.

      In this function, don’t add any ActionResult (part of the structure) in the returned data
    • Example returned data:

In client/commands.go:

  • fetchGamblingInfo – Whenever the server sends a new casinopb.GambleInfo:
    • If the GambleType is casinopb.GambleType_STOCK_INFO:
      • Display the current stock price (using fmt.Println/Printf, rather than returning the data).
      • Format: {stock_name} – current stock price: {price}
    • If the GambleType is casinopb.GambleType_ACTION_RESULT:
      • Display the received message as-is.
      • The data is stored in gambleInfo.Result.Msg

Hints:

  • You don’t need to use any for loops in these functions as that is already implemented in iterateStreamWithHandler
    • Simply use stream.Send to send the result you’ve calculated & return from the functions
    • Same goes for the similar function in the client code
  • First implement incrementAndSendStockPrice in casino and fetchGamblingInfo in client.
    • After running all applications, use the gamble action to test that periodical stock price is being shown on the screen
    • For the other unimplemented functions, use time.Sleep and return nil to avoid panics or excessive CPU load due to infinite loops

      Example:

Handle user actions

In casino/main.go:

  • Handle incoming user actions (buy or sell stocks) from the stream –
    The handleUserGamblingAction function
    • Whenever new data is received from the client via the stream, it will be of type casinopb.Action.
      • It contains the User, ActionType and stocksCount
      • The action type can be casinopb.ActionType_BUY or casinopb.ActionType_SELL
    • If the action type is BUY and the user has enough tokens to buy the stock, decrease their token balance and increase their stocks balance.
      • Given:
        • casinoServer.userToTokens[“user”] == 50 
        • casinoServer.stockPrice == 15
      • When:
        • user wants to buy 3 stocks
      • Then:
        • casinoServer.userToTokens[“user”] = 5
        • casinoServer.userToStocks[“user”] = 3
    • If the action type is SELL and the user has enough stocks to sell, then increase their token balance by stockPrice * stocksToSell and decrease their stocks balance by stocksToSell.
      • Given:
        • casinoServer.userToStocks[“user”] = 3
        • casinoServer.userToTokens[“user”] = 15
        • casinoServer.stockPrice = 10
      • When:
        • user wants to sell 2 stocks
      • Then:
        • casinoServer.userToStocks[“user”] = 1
        • casinoServer.userToTokens[“user”] = 35
    • Finally, send “operation executed successfully”, packed in a casinopb.ActionResult structure through the stream, if all went well.
    • If there was a validation error (e.g. user doesn’t have enough tokens), send an informative message which shows to the user what’s wrong
    • For validating user input, utilize the hasEnoughTokens and hasEnoughStocks functions

In client/commands.go:

  • sendUserGamblingAction – ask the user for input via the promptUserForAction function and send the chosen action to the stream
    • If the second return value from the function is true, stop communicating with the server by returning errStopGambling
    • If the returned action is nil, don’t send anything through the stream and just continue with the session

Those are all the requirements you have to accomplish as part of this exercise.

In the following section, you’ll find a complete walkthrough of the exercise. Use it in case you get stuck!

Good luck!

 
For help, refer to the gRPC Go Tutorial.
In particular, check out the RouteChat routine which is implemented in both client and server.

Full Walkthrough

Remember! Before going through this walkthrough, make sure you’ve attempted to complete the exercise on your own. 

If you’re doing the easy version of the exercise, then start from step 3, or go through step 1 and 2 to understand the code which is already there for you a bit better.

Refer to it if you get stuck or are interested in how I did it.

No cheating! 🙈🙈🙈

Now that we’ve gotten that out of the way, let’s get started!

Step 1. Setup stream & supporting goroutines in casino

We’ll start by setting up the Gamble routine in casino/main.go.

Here, there is no need to initialize the stream whatsoever, it is already open at this point and you have to just start using it.

However, what we have to do is setup the supplemental goroutines which will handle the two different types of handlers for the stream. One will send stock information on a regular cadence, the other will handle incoming input from the client.

To achieve this, we’ll first create a supplemental function iterateStreamWithHandler, which will be called in two separate goroutines.

The reasoning is that there is some goroutine management code, unrelated to the business logic which will be duplicated across both goroutines.

It will, generally, look like this:

Now, what does this code do? 

Essentially, a buffered error channel is created and in the goroutine, we have an infinite loop which stops if anything comes on the error channel.

The error channel is buffered so that the goroutine doesn’t block upon sending data through the error channel. This can lead to a memory leak if there is no routine to accept the error on the channel.

Finally, if something goes wrong in the business logic, an error is sent through the error channel which will make all other goroutines stop and the Gamble session will end.

Since we have two goroutines, this code will be duplicated twice.

Instead, we can extract it in a function like this:

Additionally, we’ll have to declare the streamHandler type somewhere.

Just add it above main:

Having this, we can reuse the new function in our Gamble routine like so:

The functions in yellow haven’t been implemented yet.

Let’s add those:

That concludes the initial set up of the stream handling logic on the casino side.

The benefit of doing all this is that we avoid code duplication and we separate the business logic from the goroutine management code.

Step 2. Setup stream & supporting goroutines in client

Let’s do a similar setup in client/commands.go

The difference here is the name of the stream handlers, as well as some code to invoke the Gamble routine and open the initial stream.

The final error is handled differently here, just to avoid printing an error message to the user in case the gambling session completed normally.

The iterateStreamWithHandler function and the streamHandler type are the same as in casino. Just write the same code here as well.

We’ll just have to implement the functions in yellow which haven’t been added yet:

Step 3. Implement periodically sending & updating the stock price

Now that the multithreading mechanisms are set-up, we can just focus on getting our business logic right.

Let’s start by implementing incrementAndSendStockPrice in casino/main.go.

What we have to do is randomly change the stock price and send it through the stream on every 10s:

The return type & what is contained is per the requirements in the Your Goal section.

On the client-side, in client/commands.go, we’ll have to implement the fetchGamblingInfo function.

Here’ we’ll just receive the message sent by casino and display it differently, based on the message type:

And that’s all there is to it in terms of propagating the stock price.

Let’s test this out (I’ve increased stock info propagation speed for the demo):

stock price update test

Step 4. Implement handling for user gambling actions

In this final step, we’ll write the business logic for handling the user actions.

This time, we’ll start from the client in client/commands.go.

What we have to do, is implement sendUserGamblingAction function.

We already have the heavy lifting done for us via the promptUserForAction function. What it does is it reads the user action from input, handles invalid actions and etc.

Additionally, it returns a second value, which is a boolean indicating if the user wants to stop the gambling session. Finally, if the returned action is nil (happens when user doesn’t enter anything), we’ll not send anything through the stream.

Here’s how the implementation looks like:

Pretty simple, given that the prompt function is already done for us.

The server-side code, however, is a little bit more convoluted.

In casino/main.go, we’ll implement handleUserGamblingAction.

We’ll start by blocking for input on the stream and initializing some helper variables:

Next, we’ll handle the user action differently, depending on whether it’s a buy or sell action. We’ll also do some validation for each of the actions:

Finally, if we reached the end successfully, return a message indicating the operation was successful:

And that is the entire implementation for our Gamble routine!

Let’s make a final test that everything works well:

final application test

Finale

Congratulations. 👏👏👏

You’ve successfully completed the exercise. 

You should now have a good understanding of how to use bidirectional streams in gRPC.

You also had some good practice doing multithreaded programming in Go.

If you took the red pill… I mean, did the harder version of the exercise…

And if you completed all the previous exercises as well, you should now have a pretty good grasp of how gRPC works and how to utilize it for your production microservices.

If you want to view the final solution for this exercise, checkout the v6-finale branch.

Site Footer

BulgariaEnglish