Initial commit.

Signed-off-by: Andrea Luzzardi <aluzzardi@gmail.com>
This commit is contained in:
Andrea Luzzardi 2014-11-07 16:41:33 -08:00
commit 05864a3b2a
9 changed files with 592 additions and 0 deletions

108
api/api.go Normal file
View File

@ -0,0 +1,108 @@
package api
import (
"fmt"
"log"
"net/http"
"github.com/gorilla/mux"
)
type HttpApiFunc func(w http.ResponseWriter, r *http.Request)
func ping(w http.ResponseWriter, r *http.Request) {
w.Write([]byte{'O', 'K'})
}
func notImplementedHandler(w http.ResponseWriter, r *http.Request) {
http.Error(w, "Not supported in clustering mode.", http.StatusNotImplemented)
}
func createRouter() (*mux.Router, error) {
r := mux.NewRouter()
m := map[string]map[string]HttpApiFunc{
"GET": {
"/_ping": ping,
"/events": notImplementedHandler,
"/info": notImplementedHandler,
"/version": notImplementedHandler,
"/images/json": notImplementedHandler,
"/images/viz": notImplementedHandler,
"/images/search": notImplementedHandler,
"/images/get": notImplementedHandler,
"/images/{name:.*}/get": notImplementedHandler,
"/images/{name:.*}/history": notImplementedHandler,
"/images/{name:.*}/json": notImplementedHandler,
"/containers/ps": notImplementedHandler,
"/containers/json": notImplementedHandler,
"/containers/{name:.*}/export": notImplementedHandler,
"/containers/{name:.*}/changes": notImplementedHandler,
"/containers/{name:.*}/json": notImplementedHandler,
"/containers/{name:.*}/top": notImplementedHandler,
"/containers/{name:.*}/logs": notImplementedHandler,
"/containers/{name:.*}/attach/ws": notImplementedHandler,
},
"POST": {
"/auth": notImplementedHandler,
"/commit": notImplementedHandler,
"/build": notImplementedHandler,
"/images/create": notImplementedHandler,
"/images/load": notImplementedHandler,
"/images/{name:.*}/push": notImplementedHandler,
"/images/{name:.*}/tag": notImplementedHandler,
"/containers/create": notImplementedHandler,
"/containers/{name:.*}/kill": notImplementedHandler,
"/containers/{name:.*}/pause": notImplementedHandler,
"/containers/{name:.*}/unpause": notImplementedHandler,
"/containers/{name:.*}/restart": notImplementedHandler,
"/containers/{name:.*}/start": notImplementedHandler,
"/containers/{name:.*}/stop": notImplementedHandler,
"/containers/{name:.*}/wait": notImplementedHandler,
"/containers/{name:.*}/resize": notImplementedHandler,
"/containers/{name:.*}/attach": notImplementedHandler,
"/containers/{name:.*}/copy": notImplementedHandler,
"/containers/{name:.*}/exec": notImplementedHandler,
"/exec/{name:.*}/start": notImplementedHandler,
"/exec/{name:.*}/resize": notImplementedHandler,
},
"DELETE": {
"/containers/{name:.*}": notImplementedHandler,
"/images/{name:.*}": notImplementedHandler,
},
"OPTIONS": {
"": notImplementedHandler,
},
}
for method, routes := range m {
for route, fct := range routes {
log.Printf("Registering %s, %s", method, route)
// NOTE: scope issue, make sure the variables are local and won't be changed
localRoute := route
localFct := fct
wrap := func(w http.ResponseWriter, r *http.Request) {
fmt.Printf("-> %s %s\n", r.Method, r.RequestURI)
localFct(w, r)
}
localMethod := method
// add the new route
r.Path("/v{version:[0-9.]+}" + localRoute).Methods(localMethod).HandlerFunc(wrap)
r.Path(localRoute).Methods(localMethod).HandlerFunc(wrap)
}
}
return r, nil
}
func ListenAndServe(addr string) error {
r, err := createRouter()
if err != nil {
return err
}
s := &http.Server{
Addr: addr,
Handler: r,
}
return s.ListenAndServe()
}

40
cluster.go Normal file
View File

@ -0,0 +1,40 @@
package libcluster
import (
"errors"
"sync"
)
var (
ErrNodeNotConnected = errors.New("node is not connected to docker's REST API")
ErrNodeAlreadyRegistered = errors.New("node was already added to the cluster")
)
type Cluster struct {
mux sync.Mutex
nodes map[string]*Node
}
func NewCluster() *Cluster {
return &Cluster{
nodes: make(map[string]*Node),
}
}
// Register a node within the cluster. The node must have been already
// initialized.
func (c *Cluster) AddNode(n *Node) error {
if !n.IsConnected() {
return ErrNodeNotConnected
}
c.mux.Lock()
defer c.mux.Unlock()
if _, exists := c.nodes[n.ID]; exists {
return ErrNodeAlreadyRegistered
}
c.nodes[n.ID] = n
return nil
}

25
container.go Normal file
View File

@ -0,0 +1,25 @@
package libcluster
import "github.com/samalba/dockerclient"
type Container struct {
*dockerclient.Container
node *Node
}
func (c *Container) Start() error {
return c.node.client.StartContainer(c.Id, nil)
}
func (c *Container) Kill(sig int) error {
return c.node.client.KillContainer(c.Id)
}
func (c *Container) Stop() error {
return c.node.client.StopContainer(c.Id, 8)
}
func (c *Container) Restart(timeout int) error {
return c.node.client.RestartContainer(c.Id, timeout)
}

14
event.go Normal file
View File

@ -0,0 +1,14 @@
package libcluster
import "time"
type Event struct {
Type string
Container *Container
Node *Node
Time time.Time
}
type EventHandler interface {
Handle(*Event) error
}

27
image.go Normal file
View File

@ -0,0 +1,27 @@
package libcluster
import (
"strings"
)
type ImageInfo struct {
Name string
Tag string
}
// Parse an image name in the format of "name:tag" and return an ImageInfo
// struct. If no tag is defined, assume "latest".
func parseImageName(name string) *ImageInfo {
imageInfo := &ImageInfo{
Name: name,
Tag: "latest",
}
img := strings.Split(name, ":")
if len(img) == 2 {
imageInfo.Name = img[0]
imageInfo.Tag = img[1]
}
return imageInfo
}

17
image_test.go Normal file
View File

@ -0,0 +1,17 @@
package libcluster
import "testing"
func TestImageNameParsing(t *testing.T) {
var i *ImageInfo
i = parseImageName("foo:bar")
if i.Name != "foo" || i.Tag != "bar" {
t.Fatalf("Parsing failed: %#v", i)
}
i = parseImageName("foo")
if i.Name != "foo" || i.Tag != "latest" {
t.Fatalf("Parsing failed: %#v", i)
}
}

250
node.go Normal file
View File

@ -0,0 +1,250 @@
package libcluster
import (
"crypto/tls"
"fmt"
"log"
"net"
"strings"
"sync"
"time"
"github.com/samalba/dockerclient"
)
const (
// Force-refresh the state of the node this often.
stateRefreshPeriod = 30 * time.Second
)
func NewNode(id string, addr string) *Node {
e := &Node{
ID: id,
Addr: addr,
Labels: make(map[string]string),
ch: make(chan bool),
}
return e
}
type Node struct {
ID string
IP string
Addr string
Cpus int
Memory int64
Labels map[string]string
mux sync.Mutex
ch chan bool
containers map[string]*Container
client dockerclient.DockerClientInterface
eventHandler EventHandler
}
// Connect will initialize a connection to the Docker daemon running on the
// host, gather machine specs (memory, cpu, ...) and monitor state changes.
func (n *Node) Connect(config *tls.Config) error {
c, err := dockerclient.NewDockerClient(n.Addr, config)
if err != nil {
return err
}
addr, err := net.ResolveIPAddr("ip4", strings.Split(c.URL.Host, ":")[0])
if err != nil {
return err
}
n.IP = addr.IP.String()
return n.connectClient(c)
}
func (n *Node) connectClient(client dockerclient.DockerClientInterface) error {
n.client = client
// Fetch the engine labels.
if err := n.updateSpecs(); err != nil {
n.client = nil
return err
}
// Force a state update before returning.
if err := n.updateState(); err != nil {
n.client = nil
return err
}
// Start the update loop.
go n.updateLoop()
// Start monitoring events from the Node.
n.client.StartMonitorEvents(n.handler)
return nil
}
// IsConnected returns true if the engine is connected to a remote docker API
func (e *Node) IsConnected() bool {
return e.client != nil
}
// Gather node specs (CPU, memory, constraints, ...).
func (n *Node) updateSpecs() error {
info, err := n.client.Info()
if err != nil {
return err
}
n.Cpus = info.NCPU
n.Memory = info.MemTotal
n.Labels = map[string]string{
"graphdriver": info.Driver,
"executiondriver": info.ExecutionDriver,
"kernelversion": info.KernelVersion,
"operatingsystem": info.OperatingSystem,
}
return nil
}
// Refresh the list and status of containers running on the node.
func (n *Node) updateState() error {
containers, err := n.client.ListContainers(true)
if err != nil {
return err
}
n.mux.Lock()
defer n.mux.Unlock()
n.containers = make(map[string]*Container)
for _, c := range containers {
container := &Container{}
container.Container = &c
n.containers[c.Id] = container
}
log.Printf("[%s] Updated state", n.ID)
return nil
}
func (n *Node) updateStateAsync() {
n.ch <- true
}
func (n *Node) updateLoop() {
for {
var err error
select {
case <-n.ch:
err = n.updateState()
case <-time.After(stateRefreshPeriod):
err = n.updateState()
}
if err != nil {
log.Printf("[%s] Updated state failed: %v", n.ID, err)
}
}
}
func (n *Node) Create(config *dockerclient.ContainerConfig, name string, pullImage bool) (*Container, error) {
var (
err error
id string
client = n.client
)
if id, err = client.CreateContainer(config, name); err != nil {
// If the error is other than not found, abort immediately.
if err != dockerclient.ErrNotFound {
return nil, err
}
// Otherwise, try to pull the image...
if err = n.Pull(config.Image); err != nil {
return nil, err
}
// ...And try again.
if id, err = client.CreateContainer(config, name); err != nil {
return nil, err
}
}
// Register the container immediately while waiting for a state refresh.
// Force a state refresh to pick up the newly created container.
n.updateState()
return n.containers[id], nil
}
func (n *Node) ListImages() ([]string, error) {
images, err := n.client.ListImages()
if err != nil {
return nil, err
}
out := []string{}
for _, i := range images {
for _, t := range i.RepoTags {
out = append(out, t)
}
}
return out, nil
}
func (n *Node) Remove(container *Container, force bool) error {
if err := n.client.RemoveContainer(container.Id, force); err != nil {
return err
}
// Remove the container from the state. Eventually, the state refresh loop
// will rewrite this.
n.mux.Lock()
defer n.mux.Unlock()
delete(n.containers, container.Id)
return nil
}
func (e *Node) Pull(image string) error {
imageInfo := parseImageName(image)
if err := e.client.PullImage(imageInfo.Name, imageInfo.Tag); err != nil {
return err
}
return nil
}
// Register an event handler.
func (n *Node) Events(h EventHandler) error {
if n.eventHandler != nil {
return fmt.Errorf("event handler already set")
}
n.eventHandler = h
return nil
}
func (n *Node) Containers() map[string]*Container {
return n.containers
}
func (n *Node) String() string {
return fmt.Sprintf("node %s addr %s", n.ID, n.Addr)
}
func (n *Node) handler(ev *dockerclient.Event, args ...interface{}) {
// Something changed - refresh our internal state.
n.updateState()
// If there is no event handler registered, abort right now.
if n.eventHandler == nil {
return
}
event := &Event{
Node: n,
Type: ev.Status,
Time: time.Unix(int64(ev.Time), 0),
Container: n.containers[ev.Id],
}
n.eventHandler.Handle(event)
}

83
node_test.go Normal file
View File

@ -0,0 +1,83 @@
package libcluster
import (
"errors"
"testing"
"github.com/samalba/dockerclient"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
)
var (
mockInfo = &dockerclient.Info{
NCPU: 10,
MemTotal: 20,
Driver: "driver-test",
ExecutionDriver: "execution-driver-test",
KernelVersion: "1.2.3",
OperatingSystem: "golang",
}
)
func TestNodeConnectionFailure(t *testing.T) {
node := NewNode("test", "test")
assert.False(t, node.IsConnected())
// Always fail.
client := dockerclient.NewDockerClientMock()
client.On("Info").Return(&dockerclient.Info{}, errors.New("fail"))
// Connect() should fail and IsConnected() return false.
assert.Error(t, node.connectClient(client))
assert.False(t, node.IsConnected())
client.Mock.AssertExpectations(t)
}
func TestNodeSpecs(t *testing.T) {
node := NewNode("test", "test")
assert.False(t, node.IsConnected())
client := dockerclient.NewDockerClientMock()
client.On("Info").Return(mockInfo, nil)
client.On("ListContainers", true).Return([]dockerclient.Container{}, nil)
client.On("StartMonitorEvents", mock.Anything, mock.Anything).Return()
assert.NoError(t, node.connectClient(client))
assert.True(t, node.IsConnected())
assert.Equal(t, node.Cpus, mockInfo.NCPU)
assert.Equal(t, node.Memory, mockInfo.MemTotal)
assert.Equal(t, node.Labels["graphdriver"], mockInfo.Driver)
assert.Equal(t, node.Labels["executiondriver"], mockInfo.ExecutionDriver)
assert.Equal(t, node.Labels["kernelversion"], mockInfo.KernelVersion)
assert.Equal(t, node.Labels["operatingsystem"], mockInfo.OperatingSystem)
client.Mock.AssertExpectations(t)
}
func TestNodeState(t *testing.T) {
node := NewNode("test", "test")
assert.False(t, node.IsConnected())
client := dockerclient.NewDockerClientMock()
client.On("Info").Return(mockInfo, nil)
client.On("StartMonitorEvents", mock.Anything, mock.Anything).Return()
// The client will return one container at first, then a second one will appear.
client.On("ListContainers", true).Return([]dockerclient.Container{{Id: "one"}}, nil).Once()
client.On("ListContainers", true).Return([]dockerclient.Container{{Id: "one"}, {Id: "two"}}, nil).Once()
assert.NoError(t, node.connectClient(client))
assert.True(t, node.IsConnected())
// The node should only have a single container at this point.
assert.Len(t, node.Containers(), 1)
// Fake an event which will trigger a refresh. The second container will appear.
node.handler(&dockerclient.Event{Id: "two", Status: "created"})
assert.Len(t, node.Containers(), 2)
client.Mock.AssertExpectations(t)
}

28
swarmd/main.go Normal file
View File

@ -0,0 +1,28 @@
package main
import (
"fmt"
"log"
"os"
"github.com/docker/libcluster"
"github.com/docker/libcluster/api"
)
func main() {
if len(os.Args) < 2 {
fmt.Printf("Usage: %s node1 node2 ...\n", os.Args[0])
os.Exit(1)
}
c := libcluster.NewCluster()
for _, addr := range os.Args[1:] {
n := libcluster.NewNode(addr, addr)
if err := n.Connect(nil); err != nil {
log.Fatal(err)
}
if err := c.AddNode(n); err != nil {
log.Fatal(err)
}
}
api.ListenAndServe(":4243")
}