I’ve read a lot about graph theory recently. They have changed the world a lot. From the simple representation to Bayesian network via Markov chains, the applications are numerous.

Today I would like to imagine a graph as a workflow of execution. Every node would be considered as runnable. And every edge would be a dependency.

It is an important framework that may be used to as an orchestrator for any model, and of course I am a lot thinkingabout TOSCA

The use case

If we consider this very simple graph (example taken from the french wikipedia page)

its corresponding adjacency matrix is:

its dimension is 8x8

For the lab, I will consider that each node has to do a simple task which is to wait for a random number of millisecond (such as Rob Pike’s boring function, see references)

Let’s GO

How will it work

Every node will be run in a goroutine. That is a point. But how do I deal with concurrency ?

Every single goroutine will be initially launched and then wait for an information.

It will have an input communication channel, and a conductor will feed this channel with enough information for the goroutine to decides whether it should run or not. This information is simply the adjacency matrix up-to-date. That means that is a node is done, its value is set to zero.

Every goroutine will then check in the adjacency matrix, whether it has predecessor (that means if the corresponding vector is null, or every digit in column N is 0) and therefore will execute the step or not.

Once the execution of task is over, the goroutine will then feed another channel to tell the conductor that its job is done. and then the conductor will broadcast the information.

Example

In our example, nodes 3, 5, and 7 do not have any predecessor, so they will be able to run first.

  • (1) The conductors feed the nodes with the matrix

  • (2) Every node get the data and analyse the matrix

  • (3) Nodes 3, 5 and 7 have no predecessor (their column in the matrix sums to zero): they can run

  • (4) Nodes 3 and 5 are done, they informs the conductor

  • (5) conductor update the matrix. It fills the rows 3 and 5 with zeros (actually rows 4 and 6, because our first node is 0)

  • (6) The conductor feeds the nodes with the matrix

  • (7) The nodes analyse the matrix

  • (8) Node 2 can run…

The representation of the use case in go

Data representation

to keep it simple, I won’t use a list or a slice to represent the matrix, but instead I will rely on the package mat64.

A slice may be more efficient, but by now it is not an issue.

On top of that, I may need later to transpose or look for eigenvalues, and this package does implement the correct method to do so. For clarity of the description, I didn’t use a float64 array to initialize the matrix.

1// Allocate a zeroed array of size 8×8
2m := mat64.NewDense(8, 8, nil)
3m.Set(0, 1, 1); m.Set(0, 4, 1) // First row
4m.Set(1, 6, 1); m.Set(1, 6, 1) // second row
5m.Set(3, 2, 1); m.Set(3, 6, 1) // fourth row
6m.Set(5, 0, 1); m.Set(5, 1, 1); m.Set(5, 2, 1) // fifth row
7m.Set(7, 6, 1) // seventh row
8fa := mat64.Formatted(m, mat64.Prefix("    "))
9fmt.Printf("\nm = %v\n\n", fa)

The node execution function (run)

The node execution is performed by a run function that takes two arguments:

  • The ID of the node
  • The duration of the sleep it performs…

This function returns a channel that will be used to exchange a Message

1func run(id int, duration time.Duration) <-chan Message { }

A Message is a structure that will holds:

  • the id of the node who have issued the message
  • a boolean which act as a flag that says whether it has already run
  • a wait channel which take a matrix as argument. This channel acts as the communication back mechanism from the conductor to the node
1type Message struct {
2	id   int
3	run  bool
4	wait chan mat64.Dense
5}

The run function will launch a goroutine which will remain active thanks to a loop. It allows the run function to finish an returns the channel as soon as possible to it can be used by the conductor.

The conductor

The conductor will be executed inside the main function in our example.

The first step is to launch as many run function as needed.

There is no need to launch them in separate goroutines, because, as explained before, the run function will returns the channel immediately because the intelligence is living in a goroutine already.

1for i := 0; i < n; i++ { // n is the dimension of the matrix
2    cs[i] = run(i, time.Duration(rand.Intn(1e3))*time.Millisecond)
3    ...

Then, as we have launched our workers, and as the communication channel exists, we should launch n “angel” goroutines, that will take care of sending back the matrix to all the workers.

1    ...
2	node := <-cs[i]
3	go func() {
4		for {
5			node.wait <- *m
6		}
7	}()
8}

Then we shall collect all the messages sent back by the goroutines to treat them and update the matrix as soon as a goroutine has finished. I will use the fanIn function as described by Rob Pike in the IO Takl of 2012 (see references) and then go in a for loop to get the results as soon as they arrived:

 1c := fanIn(cs...)
 2timeout := time.After(5 * time.Second)
 3for {
 4    select {
 5    case node := <-c:
 6        if node.run == true {
 7            fmt.Printf("%v has finished\n", node.id)
 8            // 0 its row in the matrix
 9            for c := 0; c < n; c++ {
10                m.Set(node.id, c, 0)
11            }
12        }
13    case <-timeout:
14        fmt.Println("Timeout")
15        return
16    default:
17        if mat64.Sum(m) == 0 {
18            fmt.Println("All done!")
19            return
20        }
21    }
22}
23fmt.Println("This is the end!")

Note I have set up a timeout, just in case (reference)… Note2 I do not talk about the fanIn funtion which is described here

The test

Here is what I got when I launch the test:

go run orchestrator.go 
I am 7, and I am running
I am 3, and I am running
I am 5, and I am running
3 has finished
5 has finished
I am 2, and I am running
I am 0, and I am running
0 has finished
I am 1, and I am running
I am 4, and I am running
4 has finished
7 has finished
2 has finished
1 has finished
All done!

Pretty cool

The complete source can be found here.

If you want to play: download go, setup a directory and a $GOPATH then simply

go get github.com/owulveryck/gorchestrator
cd $GOPATH/src/github.com/owulveryck/gorchestrator
go run orchestrator.go

Conclusions

I’m really happy about this implementation. It is clear and concise, and no too far to be idiomatic go.

What I would like to do now:

  • Read a TOSCA file (again) and pass the adjacency matrix to the orchestrator. That would do a complete orchestrator for cheap.
  • Re-use an old implemenation of the toscaviewer. The idea is to implement a web server that serves the matrix as a json stream. This json will be used to update the SVG (via jquery), and then we would be able to see the progession in a graphical way.

STAY TUNED!!!

References