Compare commits

...

10 Commits

Author SHA1 Message Date
Siyu Wang 9460898fb7 Optimize wait timeout in broadcastjob e2e case (#587)
Signed-off-by: FillZpp <FillZpp.pub@gmail.com>
2021-04-08 16:25:32 +08:00
Siyu Wang 94c9e2644f Fix broadcastjob expectation observed when node assigned by scheduler (#584)
Signed-off-by: FillZpp <FillZpp.pub@gmail.com>
2021-04-08 16:25:23 +08:00
champly cc1b713cb8 remove redundant filterActiveHandlers invoke
Signed-off-by: champly <champly@outlook.com>
2021-04-08 16:25:15 +08:00
FillZpp 05f0304a18 Optimize cri-runtime for kruise-daemon
Signed-off-by: FillZpp <FillZpp.pub@gmail.com>
2021-04-08 15:12:54 +08:00
FillZpp ef7ba3cff1 Update new channel address and scripts
Signed-off-by: FillZpp <FillZpp.pub@gmail.com>
2021-04-08 15:11:52 +08:00
shyung b4f06c5ed4 fix a typo for README (#559)
Signed-off-by: shyung <634125750@qq.com>
2021-04-08 15:09:17 +08:00
天元 6bc4ba172b add a license checker CI
Signed-off-by: 天元 <jianbo.sjb@alibaba-inc.com>
2021-04-08 15:09:09 +08:00
shyung b4ce0c8a10 fix a typo for README
Signed-off-by: shyung <634125750@qq.com>
2021-04-08 15:09:01 +08:00
Wei Fu 1341448aeb imagepulljob: use containerd/errdefs instead of moby
fix: #547

Signed-off-by: Wei Fu <fuweid89@gmail.com>
2021-03-11 10:34:01 +08:00
FillZpp 26641ea9d5 Fix reserveOrdinals validation for StatefulSet
Signed-off-by: FillZpp <FillZpp.pub@gmail.com>
2021-03-09 21:07:03 +08:00
45 changed files with 2380 additions and 613 deletions

26
.github/workflows/license.yml vendored Normal file
View File

@ -0,0 +1,26 @@
name: License
on:
push:
branches:
- master
- release-*
workflow_dispatch: {}
pull_request:
branches:
- master
- release-*
jobs:
license_check:
runs-on: ubuntu-latest
name: Check for unapproved licenses
steps:
- uses: actions/checkout@v2
- name: Set up Ruby
uses: ruby/setup-ruby@v1
with:
ruby-version: 2.6
- name: Install dependencies
run: gem install license_finder
- name: Run tests
run: license_finder --decisions_file .license/dependency_decisions.yml

19
.license/README.md Normal file
View File

@ -0,0 +1,19 @@
# License Checker
Our license checker CI rely on [LicenseFinder](https://github.com/pivotal/LicenseFinder).
## How to add a new license
LicenseFinder is a ruby project, so make sure you have ruby installed.
### Install the tool
```shell
gem install license_finder
```
### Add a license
```shell
license_finder permitted_licenses add MIT --decisions_file .license/dependency_decisions.yml
```

View File

@ -0,0 +1,43 @@
---
- - :permit
- MIT
- :who:
:why:
:versions: []
:when: 2021-03-12 07:35:34.645031000 Z
- - :permit
- Apache 2.0
- :who:
:why:
:versions: []
:when: 2021-03-12 07:19:18.243194000 Z
- - :permit
- New BSD
- :who:
:why:
:versions: []
:when: 2021-03-12 07:19:28.540675000 Z
- - :permit
- Simplified BSD
- :who:
:why:
:versions: []
:when: 2021-03-12 07:20:01.774212000 Z
- - :permit
- Mozilla Public License 2.0
- :who:
:why:
:versions: []
:when: 2021-03-12 07:21:05.194536000 Z
- - :permit
- unknown
- :who:
:why:
:versions: []
:when: 2021-03-12 07:21:43.379269000 Z
- - :permit
- ISC
- :who:
:why:
:versions: []
:when: 2021-03-12 07:22:07.265966000 Z

View File

@ -12,7 +12,7 @@
|![notification](docs/img/bell-outline-badge.svg) 最新进展:|
|------------------|
|Feb 4th, 2021. Kruise v0.8.0 is 发布! 提供了重构版本的 SidecarSet、UnitedDeployment 支持管理 Deployment以及一个新的 kruise-daemon 组件目前支持镜像预热,详情参见 [CHANGELOG](CHANGELOG.md).|
|Mar 4th, 2021. Kruise v0.8.0 发布! 提供了重构版本的 SidecarSet、UnitedDeployment 支持管理 Deployment以及一个新的 kruise-daemon 组件目前支持镜像预热,详情参见 [CHANGELOG](CHANGELOG.md).|
|Dec 16th, 2020. Kruise v0.7.0 发布! 提供一个新的 AdvancedCronJob CRD、将 Advanced StatefulSet 升级 v1beta1 版本、以及其他控制器一些新增能力,详情参见 [CHANGELOG](CHANGELOG.md).|
|Oct 1st, 2020. Kruise v0.6.1 发布! 提供一系列增强 feature 和 bugfix 比如 CloneSet lifecycle hook 、UnitedDeployment 支持 CloneSet 等,详情参见 [CHANGELOG](CHANGELOG.md).|
@ -90,7 +90,7 @@ helm install kruise https://github.com/openkruise/kruise/releases/download/v0.8.
活跃的社区途径:
- Slack: [channel address](https://join.slack.com/t/kruise-workspace/shared_invite/enQtNjU5NzQ0ODcyNjYzLWJlZGJiZjUwNGU5Y2U2ODI3N2JiODI4N2M1OWFlOTgzMDgyOWVkZGRjNzdmZTBjYzgxZmM5MjAyNjhhZTdmMjQ)
- Slack: [Channel in Kubernetes Slack](https://kubernetes.slack.com/channels/openkruise)
- 钉钉讨论群
<div>

View File

@ -12,7 +12,7 @@ English | [简体中文](./README-zh_CN.md)
|![notification](docs/img/bell-outline-badge.svg) What is NEW!|
|------------------|
|Feb 4th, 2021. Kruise v0.8.0 is **RELEASED**! It provides refactoring SidecarSet, Deployment hosted by UnitedDeployment, and a new kruise-daemon component which supports image pre-download, please check the [CHANGELOG](CHANGELOG.md) for details.|
|Mar 4th, 2021. Kruise v0.8.0 is **RELEASED**! It provides refactoring SidecarSet, Deployment hosted by UnitedDeployment, and a new kruise-daemon component which supports image pre-download, please check the [CHANGELOG](CHANGELOG.md) for details.|
|Dec 16th, 2020. Kruise v0.7.0 is **RELEASED**! It provides a new CRD named AdvancedCronJob, promotes AdvancedStatefulSet to v1beta1 and a few features in other controllers, please check the [CHANGELOG](CHANGELOG.md) for details.|
|Oct 1st, 2020. Kruise v0.6.1 is **RELEASED**! It provides various features and bugfix, such as CloneSet lifecycle hook and UnitedDeployment supported CloneSet, please check the [CHANGELOG](CHANGELOG.md) for details.|
@ -90,7 +90,7 @@ You are warmly welcome to hack on Kruise. We have prepared a detailed guide [CON
Active communication channels:
- Slack: [channel address](https://join.slack.com/t/kruise-workspace/shared_invite/enQtNjU5NzQ0ODcyNjYzLWJlZGJiZjUwNGU5Y2U2ODI3N2JiODI4N2M1OWFlOTgzMDgyOWVkZGRjNzdmZTBjYzgxZmM5MjAyNjhhZTdmMjQ)
- Slack: [Channel in Kubernetes Slack](https://kubernetes.slack.com/channels/openkruise)
- Mailing List: todo
- Dingtalk Group(钉钉讨论群)

View File

@ -12,6 +12,7 @@ import (
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog"
schedulerapi "k8s.io/kubernetes/pkg/scheduler/api"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/handler"
@ -43,7 +44,7 @@ func (p *podEventHandler) Create(evt event.CreateEvent, q workqueue.RateLimiting
}
if controllerRef := metav1.GetControllerOf(pod); controllerRef != nil && isBroadcastJobController(controllerRef) {
key := types.NamespacedName{Namespace: pod.Namespace, Name: controllerRef.Name}.String()
scaleExpectations.ObserveScale(key, expectations.Create, pod.Spec.NodeName)
scaleExpectations.ObserveScale(key, expectations.Create, getAssignedNode(pod))
p.enqueueHandler.Create(evt, q)
}
}
@ -52,7 +53,7 @@ func (p *podEventHandler) Delete(evt event.DeleteEvent, q workqueue.RateLimiting
pod := evt.Object.(*v1.Pod)
if controllerRef := metav1.GetControllerOf(pod); controllerRef != nil && isBroadcastJobController(controllerRef) {
key := types.NamespacedName{Namespace: pod.Namespace, Name: controllerRef.Name}.String()
scaleExpectations.ObserveScale(key, expectations.Delete, pod.Spec.NodeName)
scaleExpectations.ObserveScale(key, expectations.Delete, getAssignedNode(pod))
p.enqueueHandler.Delete(evt, q)
}
}
@ -201,3 +202,23 @@ func shouldIgnoreNodeUpdate(oldNode, curNode v1.Node) bool {
oldNode.Status.Conditions = curNode.Status.Conditions
return apiequality.Semantic.DeepEqual(oldNode, curNode)
}
func getAssignedNode(pod *v1.Pod) string {
if pod.Spec.NodeName != "" {
return pod.Spec.NodeName
}
if pod.Spec.Affinity != nil &&
pod.Spec.Affinity.NodeAffinity != nil &&
pod.Spec.Affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution != nil {
terms := pod.Spec.Affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution.NodeSelectorTerms
for _, t := range terms {
for _, req := range t.MatchFields {
if req.Key == schedulerapi.NodeFieldSelectorKeyNodeName && req.Operator == v1.NodeSelectorOpIn && len(req.Values) == 1 {
return req.Values[0]
}
}
}
}
klog.Warningf("Not found assigned node in Pod %s/%s", pod.Namespace, pod.Name)
return ""
}

View File

@ -0,0 +1,202 @@
/*
Copyright 2021 The Kruise Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package criruntime
import (
"context"
"fmt"
"os"
"time"
runtimeimage "github.com/openkruise/kruise/pkg/daemon/criruntime/imageruntime"
daemonutil "github.com/openkruise/kruise/pkg/daemon/util"
criapi "k8s.io/cri-api/pkg/apis"
runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1alpha2"
"k8s.io/klog"
kubeletremote "k8s.io/kubernetes/pkg/kubelet/remote"
kubeletutil "k8s.io/kubernetes/pkg/kubelet/util"
)
const (
kubeRuntimeAPIVersion = "0.1.0"
)
// Factory is the interface to get container and image runtime service
type Factory interface {
GetImageService() runtimeimage.ImageService
GetRuntimeService() criapi.RuntimeService
GetRuntimeServiceByName(runtimeName string) criapi.RuntimeService
}
type ContainerRuntimeType string
const (
ContainerRuntimeDocker = "docker"
ContainerRuntimeContainerd = "containerd"
ContainerRuntimePouch = "pouch"
)
type runtimeConfig struct {
runtimeType ContainerRuntimeType
runtimeURI string
runtimeRemoteURI string
}
type factory struct {
impls []*runtimeImpl
}
type runtimeImpl struct {
cfg runtimeConfig
runtimeName string
imageService runtimeimage.ImageService
runtimeService criapi.RuntimeService
}
func NewFactory(varRunPath string, accountManager daemonutil.ImagePullAccountManager) (Factory, error) {
cfgs := detectRuntime(varRunPath)
if len(cfgs) == 0 {
return nil, fmt.Errorf("not found container runtime sock")
}
var err error
f := &factory{}
var cfg runtimeConfig
for i := range cfgs {
cfg = cfgs[i]
var imageService runtimeimage.ImageService
var runtimeService criapi.RuntimeService
var typedVersion *runtimeapi.VersionResponse
switch cfg.runtimeType {
case ContainerRuntimeDocker:
imageService, err = runtimeimage.NewDockerImageService(cfg.runtimeURI, accountManager)
case ContainerRuntimePouch:
imageService, err = runtimeimage.NewPouchImageService(cfg.runtimeURI, accountManager)
case ContainerRuntimeContainerd:
addr, _, _ := kubeletutil.GetAddressAndDialer(cfg.runtimeRemoteURI)
imageService, err = runtimeimage.NewContainerdImageService(addr, accountManager)
}
if err != nil {
klog.Warningf("Failed to new image service for %v (%s, %s): %v", cfg.runtimeType, cfg.runtimeURI, cfg.runtimeRemoteURI, err)
continue
}
if _, err = imageService.ListImages(context.TODO()); err != nil {
klog.Warningf("Failed to list images for %v (%s, %s): %v", cfg.runtimeType, cfg.runtimeURI, cfg.runtimeRemoteURI, err)
continue
}
runtimeService, err = kubeletremote.NewRemoteRuntimeService(cfg.runtimeRemoteURI, time.Second*5)
if err != nil {
klog.Warningf("Failed to new runtime service for %v (%s, %s): %v", cfg.runtimeType, cfg.runtimeURI, cfg.runtimeRemoteURI, err)
continue
}
typedVersion, err = runtimeService.Version(kubeRuntimeAPIVersion)
if err != nil {
klog.Warningf("Failed to get runtime typed version for %v (%s, %s): %v", cfg.runtimeType, cfg.runtimeURI, cfg.runtimeRemoteURI, err)
continue
}
klog.V(2).Infof("Add runtime impl %v, URI: (%s, %s)", typedVersion.RuntimeName, cfg.runtimeURI, cfg.runtimeRemoteURI)
f.impls = append(f.impls, &runtimeImpl{
cfg: cfg,
runtimeName: typedVersion.RuntimeName,
imageService: imageService,
runtimeService: runtimeService,
})
}
if len(f.impls) == 0 {
return nil, err
}
return f, nil
}
func (f *factory) GetImageService() runtimeimage.ImageService {
return f.impls[0].imageService
}
func (f *factory) GetRuntimeService() criapi.RuntimeService {
return f.impls[0].runtimeService
}
func (f *factory) GetRuntimeServiceByName(runtimeName string) criapi.RuntimeService {
for _, impl := range f.impls {
if impl.runtimeName == runtimeName {
return impl.runtimeService
}
}
return nil
}
func detectRuntime(varRunPath string) []runtimeConfig {
var err error
var cfgs []runtimeConfig
// pouch
{
_, err1 := os.Stat(fmt.Sprintf("%s/pouchd.sock", varRunPath))
_, err2 := os.Stat(fmt.Sprintf("%s/pouchcri.sock", varRunPath))
if err1 == nil && err2 == nil {
cfgs = append(cfgs, runtimeConfig{
runtimeType: ContainerRuntimePouch,
runtimeURI: fmt.Sprintf("unix://%s/pouchd.sock", varRunPath),
runtimeRemoteURI: fmt.Sprintf("unix://%s/pouchcri.sock", varRunPath),
})
} else if err1 == nil && err2 != nil {
klog.Errorf("%s/pouchd.sock exists, but not found %s/pouchcri.sock", varRunPath, varRunPath)
} else if err1 != nil && err2 == nil {
klog.Errorf("%s/pouchdcri.sock exists, but not found %s/pouchd.sock", varRunPath, varRunPath)
}
}
// docker
{
_, err1 := os.Stat(fmt.Sprintf("%s/docker.sock", varRunPath))
_, err2 := os.Stat(fmt.Sprintf("%s/dockershim.sock", varRunPath))
if err1 == nil && err2 == nil {
cfgs = append(cfgs, runtimeConfig{
runtimeType: ContainerRuntimeDocker,
runtimeURI: fmt.Sprintf("unix://%s/docker.sock", varRunPath),
runtimeRemoteURI: fmt.Sprintf("unix://%s/dockershim.sock", varRunPath),
})
} else if err1 == nil && err2 != nil {
klog.Errorf("%s/docker.sock exists, but not found %s/dockershim.sock", varRunPath, varRunPath)
} else if err1 != nil && err2 == nil {
klog.Errorf("%s/dockershim.sock exists, but not found %s/docker.sock", varRunPath, varRunPath)
}
}
// containerd
{
if _, err = os.Stat(fmt.Sprintf("%s/containerd.sock", varRunPath)); err == nil {
cfgs = append(cfgs, runtimeConfig{
runtimeType: ContainerRuntimeContainerd,
runtimeRemoteURI: fmt.Sprintf("unix://%s/containerd.sock", varRunPath),
})
}
if _, err = os.Stat(fmt.Sprintf("%s/containerd/containerd.sock", varRunPath)); err == nil {
cfgs = append(cfgs, runtimeConfig{
runtimeType: ContainerRuntimeContainerd,
runtimeRemoteURI: fmt.Sprintf("unix://%s/containerd/containerd.sock", varRunPath),
})
}
}
return cfgs
}

View File

@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
package runtime
package imageruntime
import (
"context"
@ -30,13 +30,13 @@ import (
"github.com/alibaba/pouch/pkg/jsonstream"
"github.com/containerd/containerd"
"github.com/containerd/containerd/defaults"
"github.com/containerd/containerd/errdefs"
"github.com/containerd/containerd/images"
"github.com/containerd/containerd/namespaces"
"github.com/containerd/containerd/pkg/dialer"
"github.com/containerd/containerd/remotes"
"github.com/containerd/containerd/remotes/docker"
"github.com/docker/distribution/reference"
"github.com/docker/docker/errdefs"
"github.com/opencontainers/go-digest"
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
daemonutil "github.com/openkruise/kruise/pkg/daemon/util"
@ -54,11 +54,11 @@ const (
defaultContainerdNamespace = "k8s.io"
)
// NewContainerdImageRuntime returns containerd-type ImageRuntime
func NewContainerdImageRuntime(
// NewContainerdImageService returns containerd-type ImageService
func NewContainerdImageService(
address string, // containerd will be servicing basic API and CRI-API
accountManager daemonutil.ImagePullAccountManager,
) (ImageRuntime, error) {
) (ImageService, error) {
conn, err := getContainerdConn(address)
if err != nil {
return nil, err
@ -93,7 +93,7 @@ type containerdImageClient struct {
httpProxy string
}
// PullImage implements ImageRuntime.PullImage.
// PullImage implements ImageService.PullImage.
func (d *containerdImageClient) PullImage(ctx context.Context, imageName, tag string, pullSecrets []v1.Secret) (ImagePullStatusReader, error) {
ctx = namespaces.WithNamespace(ctx, d.namespace)
@ -114,7 +114,7 @@ func (d *containerdImageClient) PullImage(ctx context.Context, imageName, tag st
return d.doPullImage(ctx, namedRef, isSchema1, resolver), nil
}
// ListImages implements ImageRuntime.ListImages.
// ListImages implements ImageService.ListImages.
func (d *containerdImageClient) ListImages(ctx context.Context) ([]ImageInfo, error) {
resp, err := d.criImageClient.ListImages(ctx, &runtimeapi.ListImagesRequest{})
if err != nil {

View File

@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
package runtime
package imageruntime
import (
"context"
@ -23,8 +23,8 @@ import (
"github.com/alibaba/pouch/pkg/jsonstream"
"github.com/containerd/containerd/content"
"github.com/containerd/containerd/errdefs"
"github.com/containerd/containerd/remotes"
"github.com/docker/docker/errdefs"
digest "github.com/opencontainers/go-digest"
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
)

View File

@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
package runtime
package imageruntime
import (
"context"
@ -29,16 +29,16 @@ import (
"k8s.io/klog"
)
// NewDockerImageRuntime create a docker runtime
func NewDockerImageRuntime(runtimeURI string, accountManager daemonutil.ImagePullAccountManager) (ImageRuntime, error) {
r := &dockerImageRuntime{runtimeURI: runtimeURI, accountManager: accountManager}
// NewDockerImageService create a docker runtime
func NewDockerImageService(runtimeURI string, accountManager daemonutil.ImagePullAccountManager) (ImageService, error) {
r := &dockerImageService{runtimeURI: runtimeURI, accountManager: accountManager}
if err := r.createRuntimeClientIfNecessary(); err != nil {
return nil, err
}
return r, nil
}
type dockerImageRuntime struct {
type dockerImageService struct {
sync.Mutex
runtimeURI string
accountManager daemonutil.ImagePullAccountManager
@ -46,7 +46,7 @@ type dockerImageRuntime struct {
client *dockerapi.Client
}
func (d *dockerImageRuntime) createRuntimeClientIfNecessary() error {
func (d *dockerImageService) createRuntimeClientIfNecessary() error {
d.Lock()
defer d.Unlock()
if d.client != nil {
@ -60,7 +60,7 @@ func (d *dockerImageRuntime) createRuntimeClientIfNecessary() error {
return nil
}
func (d *dockerImageRuntime) handleRuntimeError(err error) {
func (d *dockerImageService) handleRuntimeError(err error) {
if filterCloseErr(err) {
d.Lock()
defer d.Unlock()
@ -68,7 +68,7 @@ func (d *dockerImageRuntime) handleRuntimeError(err error) {
}
}
func (d *dockerImageRuntime) PullImage(ctx context.Context, imageName, tag string, pullSecrets []v1.Secret) (reader ImagePullStatusReader, err error) {
func (d *dockerImageService) PullImage(ctx context.Context, imageName, tag string, pullSecrets []v1.Secret) (reader ImagePullStatusReader, err error) {
if err = d.createRuntimeClientIfNecessary(); err != nil {
return nil, err
}
@ -132,7 +132,7 @@ func (d *dockerImageRuntime) PullImage(ctx context.Context, imageName, tag strin
return newImagePullStatusReader(ioReader), nil
}
func (d *dockerImageRuntime) ListImages(ctx context.Context) ([]ImageInfo, error) {
func (d *dockerImageService) ListImages(ctx context.Context) ([]ImageInfo, error) {
if err := d.createRuntimeClientIfNecessary(); err != nil {
return nil, err
}

View File

@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
package runtime
package imageruntime
import (
"encoding/json"

View File

@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
package runtime
package imageruntime
import (
"context"
@ -45,7 +45,7 @@ type ImagePullStatusReader interface {
Close()
}
type ImageRuntime interface {
type ImageService interface {
PullImage(ctx context.Context, imageName, tag string, pullSecrets []v1.Secret) (ImagePullStatusReader, error)
ListImages(ctx context.Context) ([]ImageInfo, error)
}

View File

@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
package runtime
package imageruntime
import (
"context"
@ -31,16 +31,16 @@ import (
"k8s.io/klog"
)
// NewPouchImageRuntime create a pouch runtime client
func NewPouchImageRuntime(runtimeURI string, accountManager daemonutil.ImagePullAccountManager) (ImageRuntime, error) {
r := &pouchImageRuntime{runtimeURI: runtimeURI, accountManager: accountManager}
// NewPouchImageService create a pouch runtime client
func NewPouchImageService(runtimeURI string, accountManager daemonutil.ImagePullAccountManager) (ImageService, error) {
r := &pouchImageService{runtimeURI: runtimeURI, accountManager: accountManager}
if err := r.createRuntimeClientIfNecessary(); err != nil {
return nil, err
}
return r, nil
}
type pouchImageRuntime struct {
type pouchImageService struct {
sync.Mutex
runtimeURI string
accountManager daemonutil.ImagePullAccountManager
@ -48,7 +48,7 @@ type pouchImageRuntime struct {
client pouchapi.ImageAPIClient
}
func (d *pouchImageRuntime) createRuntimeClientIfNecessary() error {
func (d *pouchImageService) createRuntimeClientIfNecessary() error {
d.Lock()
defer d.Unlock()
if d.client != nil {
@ -62,7 +62,7 @@ func (d *pouchImageRuntime) createRuntimeClientIfNecessary() error {
return nil
}
func (d *pouchImageRuntime) handleRuntimeError(err error) {
func (d *pouchImageService) handleRuntimeError(err error) {
if filterCloseErr(err) {
d.Lock()
defer d.Unlock()
@ -70,7 +70,7 @@ func (d *pouchImageRuntime) handleRuntimeError(err error) {
}
}
func (d *pouchImageRuntime) PullImage(ctx context.Context, imageName, tag string, pullSecrets []v1.Secret) (reader ImagePullStatusReader, err error) {
func (d *pouchImageService) PullImage(ctx context.Context, imageName, tag string, pullSecrets []v1.Secret) (reader ImagePullStatusReader, err error) {
if err = d.createRuntimeClientIfNecessary(); err != nil {
return nil, err
}
@ -133,7 +133,7 @@ func (d *pouchImageRuntime) PullImage(ctx context.Context, imageName, tag string
return newImagePullStatusReader(ioReader), nil
}
func (d *pouchImageRuntime) ListImages(ctx context.Context) ([]ImageInfo, error) {
func (d *pouchImageService) ListImages(ctx context.Context) ([]ImageInfo, error) {
if err := d.createRuntimeClientIfNecessary(); err != nil {
return nil, err
}

View File

@ -24,8 +24,8 @@ import (
"sync"
"github.com/openkruise/kruise/pkg/client"
daemonruntime "github.com/openkruise/kruise/pkg/daemon/criruntime"
"github.com/openkruise/kruise/pkg/daemon/imagepuller"
daemonruntime "github.com/openkruise/kruise/pkg/daemon/runtime"
daemonutil "github.com/openkruise/kruise/pkg/daemon/util"
"github.com/prometheus/client_golang/prometheus/promhttp"
"k8s.io/client-go/rest"

View File

@ -23,7 +23,7 @@ import (
"time"
appsv1alpha1 "github.com/openkruise/kruise/apis/apps/v1alpha1"
daemonruntime "github.com/openkruise/kruise/pkg/daemon/runtime"
runtimeimage "github.com/openkruise/kruise/pkg/daemon/criruntime/imageruntime"
daemonutil "github.com/openkruise/kruise/pkg/daemon/util"
"github.com/openkruise/kruise/pkg/util"
v1 "k8s.io/api/core/v1"
@ -53,7 +53,7 @@ type puller interface {
type realPuller struct {
sync.Mutex
runtime daemonruntime.ImageRuntime
runtime runtimeimage.ImageService
secretManager daemonutil.SecretManager
eventRecorder record.EventRecorder
@ -62,12 +62,7 @@ type realPuller struct {
var _ puller = &realPuller{}
func newRealPuller(runtime daemonruntime.ImageRuntime, secretManager daemonutil.SecretManager, eventRecorder record.EventRecorder) (*realPuller, error) {
collection, err := runtime.ListImages(context.TODO())
if err != nil {
return nil, fmt.Errorf("failed to list images: %v", err)
}
klog.V(2).Infof("Initial list images number %v", len(collection))
func newRealPuller(runtime runtimeimage.ImageService, secretManager daemonutil.SecretManager, eventRecorder record.EventRecorder) (*realPuller, error) {
p := &realPuller{
runtime: runtime,
secretManager: secretManager,
@ -135,7 +130,7 @@ type realWorkerPool struct {
sync.Mutex
name string
runtime daemonruntime.ImageRuntime
runtime runtimeimage.ImageService
secretManager daemonutil.SecretManager
eventRecorder record.EventRecorder
pullWorkers map[string]*pullWorker
@ -145,7 +140,7 @@ type realWorkerPool struct {
lastSyncSpec *appsv1alpha1.ImageSpec
}
func newRealWorkerPool(name string, runtime daemonruntime.ImageRuntime, secretManager daemonutil.SecretManager, eventRecorder record.EventRecorder) *realWorkerPool {
func newRealWorkerPool(name string, runtime runtimeimage.ImageService, secretManager daemonutil.SecretManager, eventRecorder record.EventRecorder) *realWorkerPool {
w := &realWorkerPool{
name: name,
runtime: runtime,
@ -267,7 +262,7 @@ func (w *realWorkerPool) UpdateStatus(status *appsv1alpha1.ImageTagStatus) {
w.tagStatuses[status.Tag] = status
}
func newPullWorker(name string, tagSpec appsv1alpha1.ImageTagSpec, secrets []v1.Secret, runtime daemonruntime.ImageRuntime, statusUpdater imageStatusUpdater, ref *v1.ObjectReference, eventRecorder record.EventRecorder) *pullWorker {
func newPullWorker(name string, tagSpec appsv1alpha1.ImageTagSpec, secrets []v1.Secret, runtime runtimeimage.ImageService, statusUpdater imageStatusUpdater, ref *v1.ObjectReference, eventRecorder record.EventRecorder) *pullWorker {
o := &pullWorker{
name: name,
tagSpec: tagSpec,
@ -289,7 +284,7 @@ type pullWorker struct {
name string
tagSpec appsv1alpha1.ImageTagSpec
secrets []v1.Secret
runtime daemonruntime.ImageRuntime
runtime runtimeimage.ImageService
statusUpdater imageStatusUpdater
ref *v1.ObjectReference
eventRecorder record.EventRecorder
@ -409,7 +404,7 @@ func (w *pullWorker) Run() {
}
}
func (w *pullWorker) getImageInfo(ctx context.Context) (*daemonruntime.ImageInfo, error) {
func (w *pullWorker) getImageInfo(ctx context.Context) (*runtimeimage.ImageInfo, error) {
imageInfos, err := w.runtime.ListImages(ctx)
if err != nil {
klog.V(5).Infof("List images failed, err %v", err)

View File

@ -28,7 +28,7 @@ import (
"github.com/openkruise/kruise/pkg/client"
kruiseclient "github.com/openkruise/kruise/pkg/client/clientset/versioned"
listersalpha1 "github.com/openkruise/kruise/pkg/client/listers/apps/v1alpha1"
daemonruntime "github.com/openkruise/kruise/pkg/daemon/runtime"
daemonruntime "github.com/openkruise/kruise/pkg/daemon/criruntime"
daemonutil "github.com/openkruise/kruise/pkg/daemon/util"
nodeimagesutil "github.com/openkruise/kruise/pkg/util/nodeimages"
v1 "k8s.io/api/core/v1"
@ -65,12 +65,12 @@ type Controller struct {
// NewController returns the controller for image pulling
func NewController(runtimeFactory daemonruntime.Factory, secretManager daemonutil.SecretManager, healthz *daemonutil.Healthz) (*Controller, error) {
nodeName, _ := daemonutil.NodeName()
genericClient := client.GetGenericClientWithName("image-puller")
genericClient := client.GetGenericClientWithName("kruise-daemon-imagepuller")
informer := newNodeImageInformer(genericClient.KruiseClient, nodeName)
eventBroadcaster := record.NewBroadcaster()
eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: genericClient.KubeClient.CoreV1().Events("")})
recorder := eventBroadcaster.NewRecorder(scheme, v1.EventSource{Component: "imagepuller", Host: nodeName})
recorder := eventBroadcaster.NewRecorder(scheme, v1.EventSource{Component: "kruise-daemon-imagepuller", Host: nodeName})
queue := workqueue.NewNamedRateLimitingQueue(
// Backoff duration from 500ms to 50~55s
@ -101,7 +101,7 @@ func NewController(runtimeFactory daemonruntime.Factory, secretManager daemonuti
},
})
puller, err := newRealPuller(runtimeFactory.GetImageRuntime(), secretManager, recorder)
puller, err := newRealPuller(runtimeFactory.GetImageService(), secretManager, recorder)
if err != nil {
return nil, fmt.Errorf("failed to new puller: %v", err)
}
@ -169,7 +169,7 @@ func (c *Controller) Run(stop <-chan struct{}) {
}
}, time.Second, stop)
klog.Info("Started successfully")
klog.Info("Started puller controller successfully")
<-stop
}

View File

@ -1,111 +0,0 @@
/*
Copyright 2021 The Kruise Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package runtime
import (
"fmt"
"os"
daemonutil "github.com/openkruise/kruise/pkg/daemon/util"
"k8s.io/klog"
)
// Factory is the interface to get container and image runtime service
type Factory interface {
GetImageRuntime() ImageRuntime
}
type ContainerRuntimeType string
const (
ContainerRuntimeDocker = "docker"
ContainerRuntimeContainerd = "containerd"
ContainerRuntimePouch = "pouch"
ContainerRuntimeOther = "other"
)
type runtimeConfig struct {
runtimeType ContainerRuntimeType
runtimeURI string
}
type factory struct {
runtimeConfig
imageRuntime ImageRuntime
}
func NewFactory(varRunPath string, accountManager daemonutil.ImagePullAccountManager) (Factory, error) {
cfgs := detectRuntime(varRunPath)
if len(cfgs) == 0 {
return nil, fmt.Errorf("not found container runtime sock")
}
var err error
var imageRuntime ImageRuntime
var cfg runtimeConfig
for i := range cfgs {
cfg = cfgs[i]
switch cfg.runtimeType {
case ContainerRuntimeDocker:
imageRuntime, err = NewDockerImageRuntime(cfg.runtimeURI, accountManager)
case ContainerRuntimePouch:
imageRuntime, err = NewPouchImageRuntime(cfg.runtimeURI, accountManager)
case ContainerRuntimeContainerd:
imageRuntime, err = NewContainerdImageRuntime(cfg.runtimeURI, accountManager)
case ContainerRuntimeOther:
err = fmt.Errorf("not found container runtime sock")
}
if err != nil {
klog.Warningf("Failed to new image runtime for %v(%s): %v", cfg.runtimeType, cfg.runtimeURI, err)
continue
}
break
}
if err != nil {
return nil, err
}
return &factory{
runtimeConfig: cfg,
imageRuntime: imageRuntime,
}, nil
}
func (f *factory) GetImageRuntime() ImageRuntime {
return f.imageRuntime
}
func detectRuntime(varRunPath string) []runtimeConfig {
var err error
var cfgs []runtimeConfig
if _, err = os.Stat(fmt.Sprintf("%s/pouchd.sock", varRunPath)); err == nil {
cfgs = append(cfgs, runtimeConfig{runtimeType: ContainerRuntimePouch, runtimeURI: fmt.Sprintf("unix://%s/pouchd.sock", varRunPath)})
}
if _, err = os.Stat(fmt.Sprintf("%s/docker.sock", varRunPath)); err == nil {
cfgs = append(cfgs, runtimeConfig{runtimeType: ContainerRuntimeDocker, runtimeURI: fmt.Sprintf("unix://%s/docker.sock", varRunPath)})
}
if _, err = os.Stat(fmt.Sprintf("%s/pouchcri.sock", varRunPath)); err == nil {
cfgs = append(cfgs, runtimeConfig{runtimeType: ContainerRuntimeContainerd, runtimeURI: fmt.Sprintf("%s/pouchcri.sock", varRunPath)})
}
if _, err = os.Stat(fmt.Sprintf("%s/containerd.sock", varRunPath)); err == nil {
cfgs = append(cfgs, runtimeConfig{runtimeType: ContainerRuntimeContainerd, runtimeURI: fmt.Sprintf("%s/containerd.sock", varRunPath)})
}
if _, err = os.Stat(fmt.Sprintf("%s/containerd/containerd.sock", varRunPath)); err == nil {
cfgs = append(cfgs, runtimeConfig{runtimeType: ContainerRuntimeContainerd, runtimeURI: fmt.Sprintf("%s/containerd/containerd.sock", varRunPath)})
}
return cfgs
}

View File

@ -114,7 +114,6 @@ func Initialize(mgr manager.Manager, stopCh <-chan struct{}) error {
StatusClient: mgr.GetClient(),
}
filterActiveHandlers()
c, err := webhookcontroller.New(mgr.GetConfig(), cli, HandlerMap)
if err != nil {
return err

View File

@ -39,15 +39,15 @@ func validateStatefulSetSpec(spec *appsv1beta1.StatefulSetSpec, fldPath *field.P
}
if spec.ReserveOrdinals != nil {
orders := sets.NewInt()
orders := sets.NewInt(spec.ReserveOrdinals...)
if orders.Len() != len(spec.ReserveOrdinals) {
allErrs = append(allErrs, field.Invalid(fldPath.Root(), spec.ReserveOrdinals, "reserveOrdinals contains duplicated items"))
}
for _, i := range spec.ReserveOrdinals {
if i < 0 || i >= int(*spec.Replicas) {
allErrs = append(allErrs, field.Invalid(fldPath.Root(), spec.Template, fmt.Sprintf("reserveOrdinals contains %d which must be 0<=order<replicas", i)))
if i < 0 || i >= int(*spec.Replicas)+orders.Len() {
allErrs = append(allErrs, field.Invalid(fldPath.Root(), spec.ReserveOrdinals, fmt.Sprintf("reserveOrdinals contains %d which must be 0 <= order < (%d+%d)",
i, *spec.Replicas, orders.Len())))
}
if orders.Has(i) {
allErrs = append(allErrs, field.Invalid(fldPath.Root(), spec.Template, fmt.Sprintf("reserveOrdinals contains duplicated %d", i)))
}
orders.Insert(i)
}
}

View File

@ -6,8 +6,8 @@ kubectl delete validatingwebhookconfiguration kruise-validating-webhook-configur
# delete kruise-manager and rbac rules
kubectl delete ns kruise-system
kubectl delete clusterrolebinding kruise-manager-rolebinding
kubectl delete clusterrole kruise-manager-role
kubectl delete clusterrolebinding kruise-manager-rolebinding kruise-daemon-rolebinding
kubectl delete clusterrole kruise-manager-role kruise-daemon-role
# delete CRDs
kubectl get crd -o name | grep "customresourcedefinition.apiextensions.k8s.io/[a-z.]*.kruise.io" | xargs kubectl patch -p '{"spec":{"conversion":null}}'

View File

@ -0,0 +1,126 @@
/*
Copyright 2021 The Kruise Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package apps
import (
"time"
"github.com/onsi/ginkgo"
"github.com/onsi/gomega"
appsv1alpha1 "github.com/openkruise/kruise/apis/apps/v1alpha1"
kruiseclientset "github.com/openkruise/kruise/pkg/client/clientset/versioned"
"github.com/openkruise/kruise/test/e2e/framework"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/apimachinery/pkg/util/rand"
clientset "k8s.io/client-go/kubernetes"
)
var _ = SIGDescribe("BroadcastJob", func() {
f := framework.NewDefaultFramework("broadcastjobs")
var ns string
var c clientset.Interface
var kc kruiseclientset.Interface
var tester *framework.BroadcastJobTester
var nodeTester *framework.NodeTester
var randStr string
ginkgo.BeforeEach(func() {
c = f.ClientSet
kc = f.KruiseClientSet
ns = f.Namespace.Name
tester = framework.NewBroadcastJobTester(c, kc, ns)
nodeTester = framework.NewNodeTester(c)
randStr = rand.String(10)
})
ginkgo.AfterEach(func() {
err := nodeTester.DeleteFakeNode(randStr)
gomega.Expect(err).NotTo(gomega.HaveOccurred())
})
framework.KruiseDescribe("BroadcastJob dispatching", func() {
ginkgo.It("succeeds for parallelism < number of node", func() {
ginkgo.By("Create Fake Node " + randStr)
fakeNode, err := nodeTester.CreateFakeNode(randStr)
gomega.Expect(err).NotTo(gomega.HaveOccurred())
nodes, err := nodeTester.ListRealNodesWithFake(randStr)
gomega.Expect(err).NotTo(gomega.HaveOccurred())
ginkgo.By("Create BroadcastJob job-" + randStr)
parallelism := intstr.FromInt(1)
job := &appsv1alpha1.BroadcastJob{
ObjectMeta: metav1.ObjectMeta{Namespace: ns, Name: "job-" + randStr},
Spec: appsv1alpha1.BroadcastJobSpec{
Parallelism: &parallelism,
Template: v1.PodTemplateSpec{
Spec: v1.PodSpec{
Tolerations: []v1.Toleration{{Key: framework.E2eFakeKey, Operator: v1.TolerationOpEqual, Value: randStr, Effect: v1.TaintEffectNoSchedule}},
Containers: []v1.Container{{
Name: "pi",
Image: "perl",
Command: []string{"perl", "-Mbignum=bpi", "-wle", "print bpi(1000)"},
}},
RestartPolicy: v1.RestartPolicyNever,
},
},
CompletionPolicy: appsv1alpha1.CompletionPolicy{Type: appsv1alpha1.Always},
},
}
job, err = tester.CreateBroadcastJob(job)
gomega.Expect(err).NotTo(gomega.HaveOccurred())
ginkgo.By("Check the status of job")
gomega.Eventually(func() int32 {
job, err = tester.GetBroadcastJob(job.Name)
gomega.Expect(err).NotTo(gomega.HaveOccurred())
return job.Status.Desired
}, 3*time.Second, time.Second).Should(gomega.Equal(int32(len(nodes))))
gomega.Eventually(func() int {
pods, err := tester.GetPodsOfJob(job)
gomega.Expect(err).NotTo(gomega.HaveOccurred())
var fakePod *v1.Pod
for _, p := range pods {
if p.Spec.NodeName == fakeNode.Name {
fakePod = p
break
}
}
if fakePod != nil && fakePod.Status.Phase != v1.PodSucceeded {
ginkgo.By("Try to update Pod " + fakePod.Name + " to Succeeded")
fakePod.Status.Phase = v1.PodSucceeded
_, err = c.CoreV1().Pods(ns).UpdateStatus(fakePod)
gomega.Expect(err).NotTo(gomega.HaveOccurred())
}
return len(pods)
}, 120*time.Second, 3*time.Second).Should(gomega.Equal(len(nodes)))
gomega.Eventually(func() int32 {
job, err = tester.GetBroadcastJob(job.Name)
gomega.Expect(err).NotTo(gomega.HaveOccurred())
return job.Status.Succeeded
}, 30*time.Second, time.Second).Should(gomega.Equal(int32(len(nodes))))
})
})
})

View File

@ -0,0 +1,62 @@
/*
Copyright 2021 The Kruise Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package framework
import (
appsv1alpha1 "github.com/openkruise/kruise/apis/apps/v1alpha1"
kruiseclientset "github.com/openkruise/kruise/pkg/client/clientset/versioned"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
clientset "k8s.io/client-go/kubernetes"
)
type BroadcastJobTester struct {
c clientset.Interface
kc kruiseclientset.Interface
ns string
}
func NewBroadcastJobTester(c clientset.Interface, kc kruiseclientset.Interface, ns string) *BroadcastJobTester {
return &BroadcastJobTester{
c: c,
kc: kc,
ns: ns,
}
}
func (t *BroadcastJobTester) CreateBroadcastJob(job *appsv1alpha1.BroadcastJob) (*appsv1alpha1.BroadcastJob, error) {
return t.kc.AppsV1alpha1().BroadcastJobs(t.ns).Create(job)
}
func (t *BroadcastJobTester) GetBroadcastJob(name string) (*appsv1alpha1.BroadcastJob, error) {
return t.kc.AppsV1alpha1().BroadcastJobs(t.ns).Get(name, metav1.GetOptions{})
}
func (t *BroadcastJobTester) GetPodsOfJob(job *appsv1alpha1.BroadcastJob) (pods []*v1.Pod, err error) {
podList, err := t.c.CoreV1().Pods(t.ns).List(metav1.ListOptions{})
if err != nil {
return nil, err
}
for i := range podList.Items {
pod := &podList.Items[i]
controllerRef := metav1.GetControllerOf(pod)
if controllerRef != nil && controllerRef.UID == job.UID {
pods = append(pods, pod)
}
}
return pods, nil
}

View File

@ -0,0 +1,160 @@
/*
Copyright 2021 The Kruise Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package framework
import (
"time"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/util/retry"
"k8s.io/klog"
utilpointer "k8s.io/utils/pointer"
)
const (
E2eFakeKey = "kruise-e2e-fake"
fakeNodeNamePrefix = "fake-node-"
)
type NodeTester struct {
c clientset.Interface
}
func NewNodeTester(c clientset.Interface) *NodeTester {
return &NodeTester{
c: c,
}
}
func (t *NodeTester) CreateFakeNode(randStr string) (node *v1.Node, err error) {
name := fakeNodeNamePrefix + randStr
node = &v1.Node{
ObjectMeta: metav1.ObjectMeta{
Name: name,
Labels: map[string]string{v1.LabelHostname: name, E2eFakeKey: "true"},
},
Spec: v1.NodeSpec{
Taints: []v1.Taint{{Key: E2eFakeKey, Value: randStr, Effect: v1.TaintEffectNoSchedule}},
},
}
node, err = t.c.CoreV1().Nodes().Create(node)
if err != nil {
return nil, err
}
resources := v1.ResourceList{
v1.ResourceCPU: resource.MustParse("4"),
v1.ResourceMemory: resource.MustParse("8Gi"),
v1.ResourceEphemeralStorage: resource.MustParse("20Gi"),
v1.ResourcePods: resource.MustParse("16"),
}
fn := func() error {
return retry.RetryOnConflict(retry.DefaultBackoff, func() error {
node, err = t.c.CoreV1().Nodes().Get(name, metav1.GetOptions{})
if err != nil {
return err
}
node.Status = v1.NodeStatus{
Phase: v1.NodeRunning,
Capacity: resources,
Allocatable: resources,
Conditions: []v1.NodeCondition{
{
Type: v1.NodeReady,
Status: v1.ConditionTrue,
Reason: "FakeReady",
LastTransitionTime: metav1.Now(),
LastHeartbeatTime: metav1.Now(),
},
},
}
node, err = t.c.CoreV1().Nodes().UpdateStatus(node)
return err
})
}
err = fn()
if err != nil {
return nil, err
}
// start a goroutine to keep updating ready condition
go func() {
var noNode bool
for {
time.Sleep(time.Second * 3)
if !noNode {
err = fn()
if err != nil {
if !errors.IsNotFound(err) {
klog.Errorf("Failed to update status of fake Node %s: %v", name, err)
}
noNode = true
}
}
podList, err := t.c.CoreV1().Pods(v1.NamespaceAll).List(metav1.ListOptions{FieldSelector: "spec.nodeName=" + name})
if err != nil {
klog.Errorf("Failed to get Pods of fake Node %s: %v", name, err)
return
}
for i := range podList.Items {
pod := &podList.Items[i]
if pod.DeletionTimestamp != nil && pod.DeletionGracePeriodSeconds != nil {
t.c.CoreV1().Pods(pod.Namespace).Delete(pod.Name, &metav1.DeleteOptions{GracePeriodSeconds: utilpointer.Int64Ptr(0)})
}
}
if len(podList.Items) == 0 && noNode {
return
}
}
}()
return node, nil
}
func (t *NodeTester) DeleteFakeNode(randStr string) error {
name := "fake-node-" + randStr
err := t.c.CoreV1().Nodes().Delete(name, &metav1.DeleteOptions{})
if err != nil && !errors.IsNotFound(err) {
return err
}
return nil
}
func (t *NodeTester) ListRealNodesWithFake(randStr string) ([]*v1.Node, error) {
nodeList, err := t.c.CoreV1().Nodes().List(metav1.ListOptions{})
if err != nil {
return nil, err
}
var nodes []*v1.Node
for i := range nodeList.Items {
node := &nodeList.Items[i]
if len(node.Spec.Taints) > 1 {
continue
} else if len(node.Spec.Taints) == 1 && (node.Spec.Taints[0].Key != E2eFakeKey || node.Spec.Taints[0].Value != randStr) {
continue
}
nodes = append(nodes, node)
}
return nodes, nil
}

View File

@ -1,74 +0,0 @@
package errdefs // import "github.com/docker/docker/errdefs"
// ErrNotFound signals that the requested object doesn't exist
type ErrNotFound interface {
NotFound()
}
// ErrInvalidParameter signals that the user input is invalid
type ErrInvalidParameter interface {
InvalidParameter()
}
// ErrConflict signals that some internal state conflicts with the requested action and can't be performed.
// A change in state should be able to clear this error.
type ErrConflict interface {
Conflict()
}
// ErrUnauthorized is used to signify that the user is not authorized to perform a specific action
type ErrUnauthorized interface {
Unauthorized()
}
// ErrUnavailable signals that the requested action/subsystem is not available.
type ErrUnavailable interface {
Unavailable()
}
// ErrForbidden signals that the requested action cannot be performed under any circumstances.
// When a ErrForbidden is returned, the caller should never retry the action.
type ErrForbidden interface {
Forbidden()
}
// ErrSystem signals that some internal error occurred.
// An example of this would be a failed mount request.
type ErrSystem interface {
System()
}
// ErrNotModified signals that an action can't be performed because it's already in the desired state
type ErrNotModified interface {
NotModified()
}
// ErrAlreadyExists is a special case of ErrConflict which signals that the desired object already exists
type ErrAlreadyExists interface {
AlreadyExists()
}
// ErrNotImplemented signals that the requested action/feature is not implemented on the system as configured.
type ErrNotImplemented interface {
NotImplemented()
}
// ErrUnknown signals that the kind of error that occurred is not known.
type ErrUnknown interface {
Unknown()
}
// ErrCancelled signals that the action was cancelled.
type ErrCancelled interface {
Cancelled()
}
// ErrDeadline signals that the deadline was reached before the action completed.
type ErrDeadline interface {
DeadlineExceeded()
}
// ErrDataLoss indicates that data was lost or there is data corruption.
type ErrDataLoss interface {
DataLoss()
}

View File

@ -1,8 +0,0 @@
// Package errdefs defines a set of error interfaces that packages should use for communicating classes of errors.
// Errors that cross the package boundary should implement one (and only one) of these interfaces.
//
// Packages should not reference these interfaces directly, only implement them.
// To check if a particular error implements one of these interfaces, there are helper
// functions provided (e.g. `Is<SomeError>`) which can be used rather than asserting the interfaces directly.
// If you must assert on these interfaces, be sure to check the causal chain (`err.Cause()`).
package errdefs // import "github.com/docker/docker/errdefs"

View File

@ -1,240 +0,0 @@
package errdefs // import "github.com/docker/docker/errdefs"
import "context"
type errNotFound struct{ error }
func (errNotFound) NotFound() {}
func (e errNotFound) Cause() error {
return e.error
}
// NotFound is a helper to create an error of the class with the same name from any error type
func NotFound(err error) error {
if err == nil {
return nil
}
return errNotFound{err}
}
type errInvalidParameter struct{ error }
func (errInvalidParameter) InvalidParameter() {}
func (e errInvalidParameter) Cause() error {
return e.error
}
// InvalidParameter is a helper to create an error of the class with the same name from any error type
func InvalidParameter(err error) error {
if err == nil {
return nil
}
return errInvalidParameter{err}
}
type errConflict struct{ error }
func (errConflict) Conflict() {}
func (e errConflict) Cause() error {
return e.error
}
// Conflict is a helper to create an error of the class with the same name from any error type
func Conflict(err error) error {
if err == nil {
return nil
}
return errConflict{err}
}
type errUnauthorized struct{ error }
func (errUnauthorized) Unauthorized() {}
func (e errUnauthorized) Cause() error {
return e.error
}
// Unauthorized is a helper to create an error of the class with the same name from any error type
func Unauthorized(err error) error {
if err == nil {
return nil
}
return errUnauthorized{err}
}
type errUnavailable struct{ error }
func (errUnavailable) Unavailable() {}
func (e errUnavailable) Cause() error {
return e.error
}
// Unavailable is a helper to create an error of the class with the same name from any error type
func Unavailable(err error) error {
return errUnavailable{err}
}
type errForbidden struct{ error }
func (errForbidden) Forbidden() {}
func (e errForbidden) Cause() error {
return e.error
}
// Forbidden is a helper to create an error of the class with the same name from any error type
func Forbidden(err error) error {
if err == nil {
return nil
}
return errForbidden{err}
}
type errSystem struct{ error }
func (errSystem) System() {}
func (e errSystem) Cause() error {
return e.error
}
// System is a helper to create an error of the class with the same name from any error type
func System(err error) error {
if err == nil {
return nil
}
return errSystem{err}
}
type errNotModified struct{ error }
func (errNotModified) NotModified() {}
func (e errNotModified) Cause() error {
return e.error
}
// NotModified is a helper to create an error of the class with the same name from any error type
func NotModified(err error) error {
if err == nil {
return nil
}
return errNotModified{err}
}
type errAlreadyExists struct{ error }
func (errAlreadyExists) AlreadyExists() {}
func (e errAlreadyExists) Cause() error {
return e.error
}
// AlreadyExists is a helper to create an error of the class with the same name from any error type
func AlreadyExists(err error) error {
if err == nil {
return nil
}
return errAlreadyExists{err}
}
type errNotImplemented struct{ error }
func (errNotImplemented) NotImplemented() {}
func (e errNotImplemented) Cause() error {
return e.error
}
// NotImplemented is a helper to create an error of the class with the same name from any error type
func NotImplemented(err error) error {
if err == nil {
return nil
}
return errNotImplemented{err}
}
type errUnknown struct{ error }
func (errUnknown) Unknown() {}
func (e errUnknown) Cause() error {
return e.error
}
// Unknown is a helper to create an error of the class with the same name from any error type
func Unknown(err error) error {
if err == nil {
return nil
}
return errUnknown{err}
}
type errCancelled struct{ error }
func (errCancelled) Cancelled() {}
func (e errCancelled) Cause() error {
return e.error
}
// Cancelled is a helper to create an error of the class with the same name from any error type
func Cancelled(err error) error {
if err == nil {
return nil
}
return errCancelled{err}
}
type errDeadline struct{ error }
func (errDeadline) DeadlineExceeded() {}
func (e errDeadline) Cause() error {
return e.error
}
// Deadline is a helper to create an error of the class with the same name from any error type
func Deadline(err error) error {
if err == nil {
return nil
}
return errDeadline{err}
}
type errDataLoss struct{ error }
func (errDataLoss) DataLoss() {}
func (e errDataLoss) Cause() error {
return e.error
}
// DataLoss is a helper to create an error of the class with the same name from any error type
func DataLoss(err error) error {
if err == nil {
return nil
}
return errDataLoss{err}
}
// FromContext returns the error class from the passed in context
func FromContext(ctx context.Context) error {
e := ctx.Err()
if e == nil {
return nil
}
if e == context.Canceled {
return Cancelled(e)
}
if e == context.DeadlineExceeded {
return Deadline(e)
}
return Unknown(e)
}

View File

@ -1,114 +0,0 @@
package errdefs // import "github.com/docker/docker/errdefs"
type causer interface {
Cause() error
}
func getImplementer(err error) error {
switch e := err.(type) {
case
ErrNotFound,
ErrInvalidParameter,
ErrConflict,
ErrUnauthorized,
ErrUnavailable,
ErrForbidden,
ErrSystem,
ErrNotModified,
ErrAlreadyExists,
ErrNotImplemented,
ErrCancelled,
ErrDeadline,
ErrDataLoss,
ErrUnknown:
return err
case causer:
return getImplementer(e.Cause())
default:
return err
}
}
// IsNotFound returns if the passed in error is an ErrNotFound
func IsNotFound(err error) bool {
_, ok := getImplementer(err).(ErrNotFound)
return ok
}
// IsInvalidParameter returns if the passed in error is an ErrInvalidParameter
func IsInvalidParameter(err error) bool {
_, ok := getImplementer(err).(ErrInvalidParameter)
return ok
}
// IsConflict returns if the passed in error is an ErrConflict
func IsConflict(err error) bool {
_, ok := getImplementer(err).(ErrConflict)
return ok
}
// IsUnauthorized returns if the passed in error is an ErrUnauthorized
func IsUnauthorized(err error) bool {
_, ok := getImplementer(err).(ErrUnauthorized)
return ok
}
// IsUnavailable returns if the passed in error is an ErrUnavailable
func IsUnavailable(err error) bool {
_, ok := getImplementer(err).(ErrUnavailable)
return ok
}
// IsForbidden returns if the passed in error is an ErrForbidden
func IsForbidden(err error) bool {
_, ok := getImplementer(err).(ErrForbidden)
return ok
}
// IsSystem returns if the passed in error is an ErrSystem
func IsSystem(err error) bool {
_, ok := getImplementer(err).(ErrSystem)
return ok
}
// IsNotModified returns if the passed in error is a NotModified error
func IsNotModified(err error) bool {
_, ok := getImplementer(err).(ErrNotModified)
return ok
}
// IsAlreadyExists returns if the passed in error is a AlreadyExists error
func IsAlreadyExists(err error) bool {
_, ok := getImplementer(err).(ErrAlreadyExists)
return ok
}
// IsNotImplemented returns if the passed in error is an ErrNotImplemented
func IsNotImplemented(err error) bool {
_, ok := getImplementer(err).(ErrNotImplemented)
return ok
}
// IsUnknown returns if the passed in error is an ErrUnknown
func IsUnknown(err error) bool {
_, ok := getImplementer(err).(ErrUnknown)
return ok
}
// IsCancelled returns if the passed in error is an ErrCancelled
func IsCancelled(err error) bool {
_, ok := getImplementer(err).(ErrCancelled)
return ok
}
// IsDeadline returns if the passed in error is an ErrDeadline
func IsDeadline(err error) bool {
_, ok := getImplementer(err).(ErrDeadline)
return ok
}
// IsDataLoss returns if the passed in error is an ErrDataLoss
func IsDataLoss(err error) bool {
_, ok := getImplementer(err).(ErrDataLoss)
return ok
}

119
vendor/k8s.io/cri-api/pkg/apis/services.go generated vendored Normal file
View File

@ -0,0 +1,119 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package cri
import (
"time"
runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1alpha2"
)
// RuntimeVersioner contains methods for runtime name, version and API version.
type RuntimeVersioner interface {
// Version returns the runtime name, runtime version and runtime API version
Version(apiVersion string) (*runtimeapi.VersionResponse, error)
}
// ContainerManager contains methods to manipulate containers managed by a
// container runtime. The methods are thread-safe.
type ContainerManager interface {
// CreateContainer creates a new container in specified PodSandbox.
CreateContainer(podSandboxID string, config *runtimeapi.ContainerConfig, sandboxConfig *runtimeapi.PodSandboxConfig) (string, error)
// StartContainer starts the container.
StartContainer(containerID string) error
// StopContainer stops a running container with a grace period (i.e., timeout).
StopContainer(containerID string, timeout int64) error
// RemoveContainer removes the container.
RemoveContainer(containerID string) error
// ListContainers lists all containers by filters.
ListContainers(filter *runtimeapi.ContainerFilter) ([]*runtimeapi.Container, error)
// ContainerStatus returns the status of the container.
ContainerStatus(containerID string) (*runtimeapi.ContainerStatus, error)
// UpdateContainerResources updates the cgroup resources for the container.
UpdateContainerResources(containerID string, resources *runtimeapi.LinuxContainerResources) error
// ExecSync executes a command in the container, and returns the stdout output.
// If command exits with a non-zero exit code, an error is returned.
ExecSync(containerID string, cmd []string, timeout time.Duration) (stdout []byte, stderr []byte, err error)
// Exec prepares a streaming endpoint to execute a command in the container, and returns the address.
Exec(*runtimeapi.ExecRequest) (*runtimeapi.ExecResponse, error)
// Attach prepares a streaming endpoint to attach to a running container, and returns the address.
Attach(req *runtimeapi.AttachRequest) (*runtimeapi.AttachResponse, error)
// ReopenContainerLog asks runtime to reopen the stdout/stderr log file
// for the container. If it returns error, new container log file MUST NOT
// be created.
ReopenContainerLog(ContainerID string) error
}
// PodSandboxManager contains methods for operating on PodSandboxes. The methods
// are thread-safe.
type PodSandboxManager interface {
// RunPodSandbox creates and starts a pod-level sandbox. Runtimes should ensure
// the sandbox is in ready state.
RunPodSandbox(config *runtimeapi.PodSandboxConfig, runtimeHandler string) (string, error)
// StopPodSandbox stops the sandbox. If there are any running containers in the
// sandbox, they should be force terminated.
StopPodSandbox(podSandboxID string) error
// RemovePodSandbox removes the sandbox. If there are running containers in the
// sandbox, they should be forcibly removed.
RemovePodSandbox(podSandboxID string) error
// PodSandboxStatus returns the Status of the PodSandbox.
PodSandboxStatus(podSandboxID string) (*runtimeapi.PodSandboxStatus, error)
// ListPodSandbox returns a list of Sandbox.
ListPodSandbox(filter *runtimeapi.PodSandboxFilter) ([]*runtimeapi.PodSandbox, error)
// PortForward prepares a streaming endpoint to forward ports from a PodSandbox, and returns the address.
PortForward(*runtimeapi.PortForwardRequest) (*runtimeapi.PortForwardResponse, error)
}
// ContainerStatsManager contains methods for retrieving the container
// statistics.
type ContainerStatsManager interface {
// ContainerStats returns stats of the container. If the container does not
// exist, the call returns an error.
ContainerStats(containerID string) (*runtimeapi.ContainerStats, error)
// ListContainerStats returns stats of all running containers.
ListContainerStats(filter *runtimeapi.ContainerStatsFilter) ([]*runtimeapi.ContainerStats, error)
}
// RuntimeService interface should be implemented by a container runtime.
// The methods should be thread-safe.
type RuntimeService interface {
RuntimeVersioner
ContainerManager
PodSandboxManager
ContainerStatsManager
// UpdateRuntimeConfig updates runtime configuration if specified
UpdateRuntimeConfig(runtimeConfig *runtimeapi.RuntimeConfig) error
// Status returns the status of the runtime.
Status() (*runtimeapi.RuntimeStatus, error)
}
// ImageManagerService interface should be implemented by a container image
// manager.
// The methods should be thread-safe.
type ImageManagerService interface {
// ListImages lists the existing images.
ListImages(filter *runtimeapi.ImageFilter) ([]*runtimeapi.Image, error)
// ImageStatus returns the status of the image.
ImageStatus(image *runtimeapi.ImageSpec) (*runtimeapi.Image, error)
// PullImage pulls an image with the authentication config.
PullImage(image *runtimeapi.ImageSpec, auth *runtimeapi.AuthConfig, podSandboxConfig *runtimeapi.PodSandboxConfig) (string, error)
// RemoveImage removes the image.
RemoveImage(image *runtimeapi.ImageSpec) error
// ImageFsInfo returns information of the filesystem that is used to store images.
ImageFsInfo() ([]*runtimeapi.FilesystemUsage, error)
}

56
vendor/k8s.io/kubernetes/pkg/kubelet/remote/BUILD generated vendored Normal file
View File

@ -0,0 +1,56 @@
package(default_visibility = ["//visibility:public"])
load(
"@io_bazel_rules_go//go:def.bzl",
"go_library",
"go_test",
)
go_library(
name = "go_default_library",
srcs = [
"doc.go",
"remote_image.go",
"remote_runtime.go",
"utils.go",
],
importpath = "k8s.io/kubernetes/pkg/kubelet/remote",
deps = [
"//pkg/kubelet/util:go_default_library",
"//pkg/kubelet/util/logreduction:go_default_library",
"//staging/src/k8s.io/cri-api/pkg/apis:go_default_library",
"//staging/src/k8s.io/cri-api/pkg/apis/runtime/v1alpha2:go_default_library",
"//vendor/google.golang.org/grpc:go_default_library",
"//vendor/k8s.io/klog:go_default_library",
"//vendor/k8s.io/utils/exec:go_default_library",
],
)
filegroup(
name = "package-srcs",
srcs = glob(["**"]),
tags = ["automanaged"],
visibility = ["//visibility:private"],
)
filegroup(
name = "all-srcs",
srcs = [
":package-srcs",
"//pkg/kubelet/remote/fake:all-srcs",
],
tags = ["automanaged"],
)
go_test(
name = "go_default_test",
srcs = ["remote_runtime_test.go"],
embed = [":go_default_library"],
deps = [
"//pkg/kubelet/remote/fake:go_default_library",
"//staging/src/k8s.io/cri-api/pkg/apis:go_default_library",
"//staging/src/k8s.io/cri-api/pkg/apis/testing:go_default_library",
"//vendor/github.com/stretchr/testify/assert:go_default_library",
"//vendor/github.com/stretchr/testify/require:go_default_library",
],
)

4
vendor/k8s.io/kubernetes/pkg/kubelet/remote/OWNERS generated vendored Normal file
View File

@ -0,0 +1,4 @@
# See the OWNERS docs at https://go.k8s.io/owners
approvers:
- feiskyer

19
vendor/k8s.io/kubernetes/pkg/kubelet/remote/doc.go generated vendored Normal file
View File

@ -0,0 +1,19 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Package remote contains gRPC implementation of internalapi.RuntimeService
// and internalapi.ImageManagerService.
package remote

View File

@ -0,0 +1,155 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package remote
import (
"context"
"errors"
"fmt"
"time"
"google.golang.org/grpc"
"k8s.io/klog"
internalapi "k8s.io/cri-api/pkg/apis"
runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1alpha2"
"k8s.io/kubernetes/pkg/kubelet/util"
)
// RemoteImageService is a gRPC implementation of internalapi.ImageManagerService.
type RemoteImageService struct {
timeout time.Duration
imageClient runtimeapi.ImageServiceClient
}
// NewRemoteImageService creates a new internalapi.ImageManagerService.
func NewRemoteImageService(endpoint string, connectionTimeout time.Duration) (internalapi.ImageManagerService, error) {
klog.V(3).Infof("Connecting to image service %s", endpoint)
addr, dailer, err := util.GetAddressAndDialer(endpoint)
if err != nil {
return nil, err
}
ctx, cancel := context.WithTimeout(context.Background(), connectionTimeout)
defer cancel()
conn, err := grpc.DialContext(ctx, addr, grpc.WithInsecure(), grpc.WithDialer(dailer), grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(maxMsgSize)))
if err != nil {
klog.Errorf("Connect remote image service %s failed: %v", addr, err)
return nil, err
}
return &RemoteImageService{
timeout: connectionTimeout,
imageClient: runtimeapi.NewImageServiceClient(conn),
}, nil
}
// ListImages lists available images.
func (r *RemoteImageService) ListImages(filter *runtimeapi.ImageFilter) ([]*runtimeapi.Image, error) {
ctx, cancel := getContextWithTimeout(r.timeout)
defer cancel()
resp, err := r.imageClient.ListImages(ctx, &runtimeapi.ListImagesRequest{
Filter: filter,
})
if err != nil {
klog.Errorf("ListImages with filter %+v from image service failed: %v", filter, err)
return nil, err
}
return resp.Images, nil
}
// ImageStatus returns the status of the image.
func (r *RemoteImageService) ImageStatus(image *runtimeapi.ImageSpec) (*runtimeapi.Image, error) {
ctx, cancel := getContextWithTimeout(r.timeout)
defer cancel()
resp, err := r.imageClient.ImageStatus(ctx, &runtimeapi.ImageStatusRequest{
Image: image,
})
if err != nil {
klog.Errorf("ImageStatus %q from image service failed: %v", image.Image, err)
return nil, err
}
if resp.Image != nil {
if resp.Image.Id == "" || resp.Image.Size_ == 0 {
errorMessage := fmt.Sprintf("Id or size of image %q is not set", image.Image)
klog.Errorf("ImageStatus failed: %s", errorMessage)
return nil, errors.New(errorMessage)
}
}
return resp.Image, nil
}
// PullImage pulls an image with authentication config.
func (r *RemoteImageService) PullImage(image *runtimeapi.ImageSpec, auth *runtimeapi.AuthConfig, podSandboxConfig *runtimeapi.PodSandboxConfig) (string, error) {
ctx, cancel := getContextWithCancel()
defer cancel()
resp, err := r.imageClient.PullImage(ctx, &runtimeapi.PullImageRequest{
Image: image,
Auth: auth,
SandboxConfig: podSandboxConfig,
})
if err != nil {
klog.Errorf("PullImage %q from image service failed: %v", image.Image, err)
return "", err
}
if resp.ImageRef == "" {
errorMessage := fmt.Sprintf("imageRef of image %q is not set", image.Image)
klog.Errorf("PullImage failed: %s", errorMessage)
return "", errors.New(errorMessage)
}
return resp.ImageRef, nil
}
// RemoveImage removes the image.
func (r *RemoteImageService) RemoveImage(image *runtimeapi.ImageSpec) error {
ctx, cancel := getContextWithTimeout(r.timeout)
defer cancel()
_, err := r.imageClient.RemoveImage(ctx, &runtimeapi.RemoveImageRequest{
Image: image,
})
if err != nil {
klog.Errorf("RemoveImage %q from image service failed: %v", image.Image, err)
return err
}
return nil
}
// ImageFsInfo returns information of the filesystem that is used to store images.
func (r *RemoteImageService) ImageFsInfo() ([]*runtimeapi.FilesystemUsage, error) {
// Do not set timeout, because `ImageFsInfo` takes time.
// TODO(random-liu): Should we assume runtime should cache the result, and set timeout here?
ctx, cancel := getContextWithCancel()
defer cancel()
resp, err := r.imageClient.ImageFsInfo(ctx, &runtimeapi.ImageFsInfoRequest{})
if err != nil {
klog.Errorf("ImageFsInfo from image service failed: %v", err)
return nil, err
}
return resp.GetImageFilesystems(), nil
}

View File

@ -0,0 +1,512 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package remote
import (
"context"
"errors"
"fmt"
"strings"
"time"
"google.golang.org/grpc"
"k8s.io/klog"
internalapi "k8s.io/cri-api/pkg/apis"
runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1alpha2"
"k8s.io/kubernetes/pkg/kubelet/util"
"k8s.io/kubernetes/pkg/kubelet/util/logreduction"
utilexec "k8s.io/utils/exec"
)
// RemoteRuntimeService is a gRPC implementation of internalapi.RuntimeService.
type RemoteRuntimeService struct {
timeout time.Duration
runtimeClient runtimeapi.RuntimeServiceClient
// Cache last per-container error message to reduce log spam
logReduction *logreduction.LogReduction
}
const (
// How frequently to report identical errors
identicalErrorDelay = 1 * time.Minute
)
// NewRemoteRuntimeService creates a new internalapi.RuntimeService.
func NewRemoteRuntimeService(endpoint string, connectionTimeout time.Duration) (internalapi.RuntimeService, error) {
klog.V(3).Infof("Connecting to runtime service %s", endpoint)
addr, dailer, err := util.GetAddressAndDialer(endpoint)
if err != nil {
return nil, err
}
ctx, cancel := context.WithTimeout(context.Background(), connectionTimeout)
defer cancel()
conn, err := grpc.DialContext(ctx, addr, grpc.WithInsecure(), grpc.WithDialer(dailer), grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(maxMsgSize)))
if err != nil {
klog.Errorf("Connect remote runtime %s failed: %v", addr, err)
return nil, err
}
return &RemoteRuntimeService{
timeout: connectionTimeout,
runtimeClient: runtimeapi.NewRuntimeServiceClient(conn),
logReduction: logreduction.NewLogReduction(identicalErrorDelay),
}, nil
}
// Version returns the runtime name, runtime version and runtime API version.
func (r *RemoteRuntimeService) Version(apiVersion string) (*runtimeapi.VersionResponse, error) {
ctx, cancel := getContextWithTimeout(r.timeout)
defer cancel()
typedVersion, err := r.runtimeClient.Version(ctx, &runtimeapi.VersionRequest{
Version: apiVersion,
})
if err != nil {
klog.Errorf("Version from runtime service failed: %v", err)
return nil, err
}
if typedVersion.Version == "" || typedVersion.RuntimeName == "" || typedVersion.RuntimeApiVersion == "" || typedVersion.RuntimeVersion == "" {
return nil, fmt.Errorf("not all fields are set in VersionResponse (%q)", *typedVersion)
}
return typedVersion, err
}
// RunPodSandbox creates and starts a pod-level sandbox. Runtimes should ensure
// the sandbox is in ready state.
func (r *RemoteRuntimeService) RunPodSandbox(config *runtimeapi.PodSandboxConfig, runtimeHandler string) (string, error) {
// Use 2 times longer timeout for sandbox operation (4 mins by default)
// TODO: Make the pod sandbox timeout configurable.
ctx, cancel := getContextWithTimeout(r.timeout * 2)
defer cancel()
resp, err := r.runtimeClient.RunPodSandbox(ctx, &runtimeapi.RunPodSandboxRequest{
Config: config,
RuntimeHandler: runtimeHandler,
})
if err != nil {
klog.Errorf("RunPodSandbox from runtime service failed: %v", err)
return "", err
}
if resp.PodSandboxId == "" {
errorMessage := fmt.Sprintf("PodSandboxId is not set for sandbox %q", config.GetMetadata())
klog.Errorf("RunPodSandbox failed: %s", errorMessage)
return "", errors.New(errorMessage)
}
return resp.PodSandboxId, nil
}
// StopPodSandbox stops the sandbox. If there are any running containers in the
// sandbox, they should be forced to termination.
func (r *RemoteRuntimeService) StopPodSandbox(podSandBoxID string) error {
ctx, cancel := getContextWithTimeout(r.timeout)
defer cancel()
_, err := r.runtimeClient.StopPodSandbox(ctx, &runtimeapi.StopPodSandboxRequest{
PodSandboxId: podSandBoxID,
})
if err != nil {
klog.Errorf("StopPodSandbox %q from runtime service failed: %v", podSandBoxID, err)
return err
}
return nil
}
// RemovePodSandbox removes the sandbox. If there are any containers in the
// sandbox, they should be forcibly removed.
func (r *RemoteRuntimeService) RemovePodSandbox(podSandBoxID string) error {
ctx, cancel := getContextWithTimeout(r.timeout)
defer cancel()
_, err := r.runtimeClient.RemovePodSandbox(ctx, &runtimeapi.RemovePodSandboxRequest{
PodSandboxId: podSandBoxID,
})
if err != nil {
klog.Errorf("RemovePodSandbox %q from runtime service failed: %v", podSandBoxID, err)
return err
}
return nil
}
// PodSandboxStatus returns the status of the PodSandbox.
func (r *RemoteRuntimeService) PodSandboxStatus(podSandBoxID string) (*runtimeapi.PodSandboxStatus, error) {
ctx, cancel := getContextWithTimeout(r.timeout)
defer cancel()
resp, err := r.runtimeClient.PodSandboxStatus(ctx, &runtimeapi.PodSandboxStatusRequest{
PodSandboxId: podSandBoxID,
})
if err != nil {
return nil, err
}
if resp.Status != nil {
if err := verifySandboxStatus(resp.Status); err != nil {
return nil, err
}
}
return resp.Status, nil
}
// ListPodSandbox returns a list of PodSandboxes.
func (r *RemoteRuntimeService) ListPodSandbox(filter *runtimeapi.PodSandboxFilter) ([]*runtimeapi.PodSandbox, error) {
ctx, cancel := getContextWithTimeout(r.timeout)
defer cancel()
resp, err := r.runtimeClient.ListPodSandbox(ctx, &runtimeapi.ListPodSandboxRequest{
Filter: filter,
})
if err != nil {
klog.Errorf("ListPodSandbox with filter %+v from runtime service failed: %v", filter, err)
return nil, err
}
return resp.Items, nil
}
// CreateContainer creates a new container in the specified PodSandbox.
func (r *RemoteRuntimeService) CreateContainer(podSandBoxID string, config *runtimeapi.ContainerConfig, sandboxConfig *runtimeapi.PodSandboxConfig) (string, error) {
ctx, cancel := getContextWithTimeout(r.timeout)
defer cancel()
resp, err := r.runtimeClient.CreateContainer(ctx, &runtimeapi.CreateContainerRequest{
PodSandboxId: podSandBoxID,
Config: config,
SandboxConfig: sandboxConfig,
})
if err != nil {
klog.Errorf("CreateContainer in sandbox %q from runtime service failed: %v", podSandBoxID, err)
return "", err
}
if resp.ContainerId == "" {
errorMessage := fmt.Sprintf("ContainerId is not set for container %q", config.GetMetadata())
klog.Errorf("CreateContainer failed: %s", errorMessage)
return "", errors.New(errorMessage)
}
return resp.ContainerId, nil
}
// StartContainer starts the container.
func (r *RemoteRuntimeService) StartContainer(containerID string) error {
ctx, cancel := getContextWithTimeout(r.timeout)
defer cancel()
_, err := r.runtimeClient.StartContainer(ctx, &runtimeapi.StartContainerRequest{
ContainerId: containerID,
})
if err != nil {
klog.Errorf("StartContainer %q from runtime service failed: %v", containerID, err)
return err
}
return nil
}
// StopContainer stops a running container with a grace period (i.e., timeout).
func (r *RemoteRuntimeService) StopContainer(containerID string, timeout int64) error {
// Use timeout + default timeout (2 minutes) as timeout to leave extra time
// for SIGKILL container and request latency.
t := r.timeout + time.Duration(timeout)*time.Second
ctx, cancel := getContextWithTimeout(t)
defer cancel()
r.logReduction.ClearID(containerID)
_, err := r.runtimeClient.StopContainer(ctx, &runtimeapi.StopContainerRequest{
ContainerId: containerID,
Timeout: timeout,
})
if err != nil {
klog.Errorf("StopContainer %q from runtime service failed: %v", containerID, err)
return err
}
return nil
}
// RemoveContainer removes the container. If the container is running, the container
// should be forced to removal.
func (r *RemoteRuntimeService) RemoveContainer(containerID string) error {
ctx, cancel := getContextWithTimeout(r.timeout)
defer cancel()
r.logReduction.ClearID(containerID)
_, err := r.runtimeClient.RemoveContainer(ctx, &runtimeapi.RemoveContainerRequest{
ContainerId: containerID,
})
if err != nil {
klog.Errorf("RemoveContainer %q from runtime service failed: %v", containerID, err)
return err
}
return nil
}
// ListContainers lists containers by filters.
func (r *RemoteRuntimeService) ListContainers(filter *runtimeapi.ContainerFilter) ([]*runtimeapi.Container, error) {
ctx, cancel := getContextWithTimeout(r.timeout)
defer cancel()
resp, err := r.runtimeClient.ListContainers(ctx, &runtimeapi.ListContainersRequest{
Filter: filter,
})
if err != nil {
klog.Errorf("ListContainers with filter %+v from runtime service failed: %v", filter, err)
return nil, err
}
return resp.Containers, nil
}
// ContainerStatus returns the container status.
func (r *RemoteRuntimeService) ContainerStatus(containerID string) (*runtimeapi.ContainerStatus, error) {
ctx, cancel := getContextWithTimeout(r.timeout)
defer cancel()
resp, err := r.runtimeClient.ContainerStatus(ctx, &runtimeapi.ContainerStatusRequest{
ContainerId: containerID,
})
if err != nil {
// Don't spam the log with endless messages about the same failure.
if r.logReduction.ShouldMessageBePrinted(err.Error(), containerID) {
klog.Errorf("ContainerStatus %q from runtime service failed: %v", containerID, err)
}
return nil, err
}
r.logReduction.ClearID(containerID)
if resp.Status != nil {
if err := verifyContainerStatus(resp.Status); err != nil {
klog.Errorf("ContainerStatus of %q failed: %v", containerID, err)
return nil, err
}
}
return resp.Status, nil
}
// UpdateContainerResources updates a containers resource config
func (r *RemoteRuntimeService) UpdateContainerResources(containerID string, resources *runtimeapi.LinuxContainerResources) error {
ctx, cancel := getContextWithTimeout(r.timeout)
defer cancel()
_, err := r.runtimeClient.UpdateContainerResources(ctx, &runtimeapi.UpdateContainerResourcesRequest{
ContainerId: containerID,
Linux: resources,
})
if err != nil {
klog.Errorf("UpdateContainerResources %q from runtime service failed: %v", containerID, err)
return err
}
return nil
}
// ExecSync executes a command in the container, and returns the stdout output.
// If command exits with a non-zero exit code, an error is returned.
func (r *RemoteRuntimeService) ExecSync(containerID string, cmd []string, timeout time.Duration) (stdout []byte, stderr []byte, err error) {
// Do not set timeout when timeout is 0.
var ctx context.Context
var cancel context.CancelFunc
if timeout != 0 {
// Use timeout + default timeout (2 minutes) as timeout to leave some time for
// the runtime to do cleanup.
ctx, cancel = getContextWithTimeout(r.timeout + timeout)
} else {
ctx, cancel = getContextWithCancel()
}
defer cancel()
timeoutSeconds := int64(timeout.Seconds())
req := &runtimeapi.ExecSyncRequest{
ContainerId: containerID,
Cmd: cmd,
Timeout: timeoutSeconds,
}
resp, err := r.runtimeClient.ExecSync(ctx, req)
if err != nil {
klog.Errorf("ExecSync %s '%s' from runtime service failed: %v", containerID, strings.Join(cmd, " "), err)
return nil, nil, err
}
err = nil
if resp.ExitCode != 0 {
err = utilexec.CodeExitError{
Err: fmt.Errorf("command '%s' exited with %d: %s", strings.Join(cmd, " "), resp.ExitCode, resp.Stderr),
Code: int(resp.ExitCode),
}
}
return resp.Stdout, resp.Stderr, err
}
// Exec prepares a streaming endpoint to execute a command in the container, and returns the address.
func (r *RemoteRuntimeService) Exec(req *runtimeapi.ExecRequest) (*runtimeapi.ExecResponse, error) {
ctx, cancel := getContextWithTimeout(r.timeout)
defer cancel()
resp, err := r.runtimeClient.Exec(ctx, req)
if err != nil {
klog.Errorf("Exec %s '%s' from runtime service failed: %v", req.ContainerId, strings.Join(req.Cmd, " "), err)
return nil, err
}
if resp.Url == "" {
errorMessage := "URL is not set"
klog.Errorf("Exec failed: %s", errorMessage)
return nil, errors.New(errorMessage)
}
return resp, nil
}
// Attach prepares a streaming endpoint to attach to a running container, and returns the address.
func (r *RemoteRuntimeService) Attach(req *runtimeapi.AttachRequest) (*runtimeapi.AttachResponse, error) {
ctx, cancel := getContextWithTimeout(r.timeout)
defer cancel()
resp, err := r.runtimeClient.Attach(ctx, req)
if err != nil {
klog.Errorf("Attach %s from runtime service failed: %v", req.ContainerId, err)
return nil, err
}
if resp.Url == "" {
errorMessage := "URL is not set"
klog.Errorf("Exec failed: %s", errorMessage)
return nil, errors.New(errorMessage)
}
return resp, nil
}
// PortForward prepares a streaming endpoint to forward ports from a PodSandbox, and returns the address.
func (r *RemoteRuntimeService) PortForward(req *runtimeapi.PortForwardRequest) (*runtimeapi.PortForwardResponse, error) {
ctx, cancel := getContextWithTimeout(r.timeout)
defer cancel()
resp, err := r.runtimeClient.PortForward(ctx, req)
if err != nil {
klog.Errorf("PortForward %s from runtime service failed: %v", req.PodSandboxId, err)
return nil, err
}
if resp.Url == "" {
errorMessage := "URL is not set"
klog.Errorf("Exec failed: %s", errorMessage)
return nil, errors.New(errorMessage)
}
return resp, nil
}
// UpdateRuntimeConfig updates the config of a runtime service. The only
// update payload currently supported is the pod CIDR assigned to a node,
// and the runtime service just proxies it down to the network plugin.
func (r *RemoteRuntimeService) UpdateRuntimeConfig(runtimeConfig *runtimeapi.RuntimeConfig) error {
ctx, cancel := getContextWithTimeout(r.timeout)
defer cancel()
// Response doesn't contain anything of interest. This translates to an
// Event notification to the network plugin, which can't fail, so we're
// really looking to surface destination unreachable.
_, err := r.runtimeClient.UpdateRuntimeConfig(ctx, &runtimeapi.UpdateRuntimeConfigRequest{
RuntimeConfig: runtimeConfig,
})
if err != nil {
return err
}
return nil
}
// Status returns the status of the runtime.
func (r *RemoteRuntimeService) Status() (*runtimeapi.RuntimeStatus, error) {
ctx, cancel := getContextWithTimeout(r.timeout)
defer cancel()
resp, err := r.runtimeClient.Status(ctx, &runtimeapi.StatusRequest{})
if err != nil {
klog.Errorf("Status from runtime service failed: %v", err)
return nil, err
}
if resp.Status == nil || len(resp.Status.Conditions) < 2 {
errorMessage := "RuntimeReady or NetworkReady condition are not set"
klog.Errorf("Status failed: %s", errorMessage)
return nil, errors.New(errorMessage)
}
return resp.Status, nil
}
// ContainerStats returns the stats of the container.
func (r *RemoteRuntimeService) ContainerStats(containerID string) (*runtimeapi.ContainerStats, error) {
ctx, cancel := getContextWithTimeout(r.timeout)
defer cancel()
resp, err := r.runtimeClient.ContainerStats(ctx, &runtimeapi.ContainerStatsRequest{
ContainerId: containerID,
})
if err != nil {
if r.logReduction.ShouldMessageBePrinted(err.Error(), containerID) {
klog.Errorf("ContainerStatus %q from runtime service failed: %v", containerID, err)
}
return nil, err
}
r.logReduction.ClearID(containerID)
return resp.GetStats(), nil
}
func (r *RemoteRuntimeService) ListContainerStats(filter *runtimeapi.ContainerStatsFilter) ([]*runtimeapi.ContainerStats, error) {
// Do not set timeout, because writable layer stats collection takes time.
// TODO(random-liu): Should we assume runtime should cache the result, and set timeout here?
ctx, cancel := getContextWithCancel()
defer cancel()
resp, err := r.runtimeClient.ListContainerStats(ctx, &runtimeapi.ListContainerStatsRequest{
Filter: filter,
})
if err != nil {
klog.Errorf("ListContainerStats with filter %+v from runtime service failed: %v", filter, err)
return nil, err
}
return resp.GetStats(), nil
}
func (r *RemoteRuntimeService) ReopenContainerLog(containerID string) error {
ctx, cancel := getContextWithTimeout(r.timeout)
defer cancel()
_, err := r.runtimeClient.ReopenContainerLog(ctx, &runtimeapi.ReopenContainerLogRequest{ContainerId: containerID})
if err != nil {
klog.Errorf("ReopenContainerLog %q from runtime service failed: %v", containerID, err)
return err
}
return nil
}

91
vendor/k8s.io/kubernetes/pkg/kubelet/remote/utils.go generated vendored Normal file
View File

@ -0,0 +1,91 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package remote
import (
"context"
"fmt"
"time"
runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1alpha2"
)
// maxMsgSize use 16MB as the default message size limit.
// grpc library default is 4MB
const maxMsgSize = 1024 * 1024 * 16
// getContextWithTimeout returns a context with timeout.
func getContextWithTimeout(timeout time.Duration) (context.Context, context.CancelFunc) {
return context.WithTimeout(context.Background(), timeout)
}
// getContextWithCancel returns a context with cancel.
func getContextWithCancel() (context.Context, context.CancelFunc) {
return context.WithCancel(context.Background())
}
// verifySandboxStatus verified whether all required fields are set in PodSandboxStatus.
func verifySandboxStatus(status *runtimeapi.PodSandboxStatus) error {
if status.Id == "" {
return fmt.Errorf("Id is not set")
}
if status.Metadata == nil {
return fmt.Errorf("Metadata is not set")
}
metadata := status.Metadata
if metadata.Name == "" || metadata.Namespace == "" || metadata.Uid == "" {
return fmt.Errorf("Name, Namespace or Uid is not in metadata %q", metadata)
}
if status.CreatedAt == 0 {
return fmt.Errorf("CreatedAt is not set")
}
return nil
}
// verifyContainerStatus verified whether all required fields are set in ContainerStatus.
func verifyContainerStatus(status *runtimeapi.ContainerStatus) error {
if status.Id == "" {
return fmt.Errorf("Id is not set")
}
if status.Metadata == nil {
return fmt.Errorf("Metadata is not set")
}
metadata := status.Metadata
if metadata.Name == "" {
return fmt.Errorf("Name is not in metadata %q", metadata)
}
if status.CreatedAt == 0 {
return fmt.Errorf("CreatedAt is not set")
}
if status.Image == nil || status.Image.Image == "" {
return fmt.Errorf("Image is not set")
}
if status.ImageRef == "" {
return fmt.Errorf("ImageRef is not set")
}
return nil
}

110
vendor/k8s.io/kubernetes/pkg/kubelet/util/BUILD generated vendored Normal file
View File

@ -0,0 +1,110 @@
package(default_visibility = ["//visibility:public"])
load(
"@io_bazel_rules_go//go:def.bzl",
"go_library",
"go_test",
)
go_test(
name = "go_default_test",
srcs = [
"util_unix_test.go",
"util_windows_test.go",
],
embed = [":go_default_library"],
deps = select({
"@io_bazel_rules_go//go/platform:android": [
"//vendor/github.com/stretchr/testify/assert:go_default_library",
"//vendor/github.com/stretchr/testify/require:go_default_library",
],
"@io_bazel_rules_go//go/platform:darwin": [
"//vendor/github.com/stretchr/testify/assert:go_default_library",
"//vendor/github.com/stretchr/testify/require:go_default_library",
],
"@io_bazel_rules_go//go/platform:freebsd": [
"//vendor/github.com/stretchr/testify/assert:go_default_library",
"//vendor/github.com/stretchr/testify/require:go_default_library",
],
"@io_bazel_rules_go//go/platform:ios": [
"//vendor/github.com/stretchr/testify/assert:go_default_library",
"//vendor/github.com/stretchr/testify/require:go_default_library",
],
"@io_bazel_rules_go//go/platform:linux": [
"//vendor/github.com/stretchr/testify/assert:go_default_library",
"//vendor/github.com/stretchr/testify/require:go_default_library",
],
"@io_bazel_rules_go//go/platform:windows": [
"//vendor/github.com/Microsoft/go-winio:go_default_library",
"//vendor/github.com/stretchr/testify/assert:go_default_library",
"//vendor/github.com/stretchr/testify/require:go_default_library",
],
"//conditions:default": [],
}),
)
go_library(
name = "go_default_library",
srcs = [
"boottime_util_darwin.go",
"boottime_util_linux.go",
"doc.go",
"util.go",
"util_unix.go",
"util_unsupported.go",
"util_windows.go",
],
importpath = "k8s.io/kubernetes/pkg/kubelet/util",
deps = [
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
] + select({
"@io_bazel_rules_go//go/platform:android": [
"//vendor/golang.org/x/sys/unix:go_default_library",
"//vendor/k8s.io/klog:go_default_library",
],
"@io_bazel_rules_go//go/platform:darwin": [
"//vendor/golang.org/x/sys/unix:go_default_library",
"//vendor/k8s.io/klog:go_default_library",
],
"@io_bazel_rules_go//go/platform:freebsd": [
"//vendor/golang.org/x/sys/unix:go_default_library",
"//vendor/k8s.io/klog:go_default_library",
],
"@io_bazel_rules_go//go/platform:ios": [
"//vendor/golang.org/x/sys/unix:go_default_library",
"//vendor/k8s.io/klog:go_default_library",
],
"@io_bazel_rules_go//go/platform:linux": [
"//vendor/golang.org/x/sys/unix:go_default_library",
"//vendor/k8s.io/klog:go_default_library",
],
"@io_bazel_rules_go//go/platform:windows": [
"//vendor/github.com/Microsoft/go-winio:go_default_library",
],
"//conditions:default": [],
}),
)
filegroup(
name = "package-srcs",
srcs = glob(["**"]),
tags = ["automanaged"],
visibility = ["//visibility:private"],
)
filegroup(
name = "all-srcs",
srcs = [
":package-srcs",
"//pkg/kubelet/util/cache:all-srcs",
"//pkg/kubelet/util/format:all-srcs",
"//pkg/kubelet/util/ioutils:all-srcs",
"//pkg/kubelet/util/logreduction:all-srcs",
"//pkg/kubelet/util/manager:all-srcs",
"//pkg/kubelet/util/queue:all-srcs",
"//pkg/kubelet/util/sliceutils:all-srcs",
"//pkg/kubelet/util/store:all-srcs",
],
tags = ["automanaged"],
visibility = ["//visibility:public"],
)

View File

@ -0,0 +1,44 @@
// +build darwin
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package util
import (
"fmt"
"syscall"
"time"
"unsafe"
"golang.org/x/sys/unix"
)
// GetBootTime returns the time at which the machine was started, truncated to the nearest second
func GetBootTime() (time.Time, error) {
output, err := unix.SysctlRaw("kern.boottime")
if err != nil {
return time.Time{}, err
}
var timeval syscall.Timeval
if len(output) != int(unsafe.Sizeof(timeval)) {
return time.Time{}, fmt.Errorf("unexpected output when calling syscall kern.bootime. Expected len(output) to be %v, but got %v",
int(unsafe.Sizeof(timeval)), len(output))
}
timeval = *(*syscall.Timeval)(unsafe.Pointer(&output[0]))
sec, nsec := timeval.Unix()
return time.Unix(sec, nsec).Truncate(time.Second), nil
}

View File

@ -0,0 +1,36 @@
// +build freebsd linux
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package util
import (
"fmt"
"time"
"golang.org/x/sys/unix"
)
// GetBootTime returns the time at which the machine was started, truncated to the nearest second
func GetBootTime() (time.Time, error) {
currentTime := time.Now()
var info unix.Sysinfo_t
if err := unix.Sysinfo(&info); err != nil {
return time.Time{}, fmt.Errorf("error getting system uptime: %s", err)
}
return currentTime.Add(-time.Duration(info.Uptime) * time.Second).Truncate(time.Second), nil
}

18
vendor/k8s.io/kubernetes/pkg/kubelet/util/doc.go generated vendored Normal file
View File

@ -0,0 +1,18 @@
/*
Copyright 2015 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Package util holds utility functions.
package util // import "k8s.io/kubernetes/pkg/kubelet/util"

View File

@ -0,0 +1,28 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
go_library(
name = "go_default_library",
srcs = ["logreduction.go"],
importpath = "k8s.io/kubernetes/pkg/kubelet/util/logreduction",
visibility = ["//visibility:public"],
)
filegroup(
name = "package-srcs",
srcs = glob(["**"]),
tags = ["automanaged"],
visibility = ["//visibility:private"],
)
filegroup(
name = "all-srcs",
srcs = [":package-srcs"],
tags = ["automanaged"],
visibility = ["//visibility:public"],
)
go_test(
name = "go_default_test",
srcs = ["logreduction_test.go"],
embed = [":go_default_library"],
)

View File

@ -0,0 +1,78 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package logreduction
import (
"sync"
"time"
)
var nowfunc = func() time.Time { return time.Now() }
// LogReduction provides a filter for consecutive identical log messages;
// a message will be printed no more than once per interval.
// If a string of messages is interrupted by a different message,
// the interval timer will be reset.
type LogReduction struct {
lastError map[string]string
errorPrinted map[string]time.Time
errorMapLock sync.Mutex
identicalErrorDelay time.Duration
}
// NewLogReduction returns an initialized LogReduction
func NewLogReduction(identicalErrorDelay time.Duration) *LogReduction {
l := new(LogReduction)
l.lastError = make(map[string]string)
l.errorPrinted = make(map[string]time.Time)
l.identicalErrorDelay = identicalErrorDelay
return l
}
func (l *LogReduction) cleanupErrorTimeouts() {
for name, timeout := range l.errorPrinted {
if nowfunc().Sub(timeout) >= l.identicalErrorDelay {
delete(l.errorPrinted, name)
delete(l.lastError, name)
}
}
}
// ShouldMessageBePrinted determines whether a message should be printed based
// on how long ago this particular message was last printed
func (l *LogReduction) ShouldMessageBePrinted(message string, parentID string) bool {
l.errorMapLock.Lock()
defer l.errorMapLock.Unlock()
l.cleanupErrorTimeouts()
lastMsg, ok := l.lastError[parentID]
lastPrinted, ok1 := l.errorPrinted[parentID]
if !ok || !ok1 || message != lastMsg || nowfunc().Sub(lastPrinted) >= l.identicalErrorDelay {
l.errorPrinted[parentID] = nowfunc()
l.lastError[parentID] = message
return true
}
return false
}
// ClearID clears out log reduction records pertaining to a particular parent
// (e. g. container ID)
func (l *LogReduction) ClearID(parentID string) {
l.errorMapLock.Lock()
defer l.errorMapLock.Unlock()
delete(l.lastError, parentID)
delete(l.errorPrinted, parentID)
}

27
vendor/k8s.io/kubernetes/pkg/kubelet/util/util.go generated vendored Normal file
View File

@ -0,0 +1,27 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package util
import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
// FromApiserverCache modifies <opts> so that the GET request will
// be served from apiserver cache instead of from etcd.
func FromApiserverCache(opts *metav1.GetOptions) {
opts.ResourceVersion = "0"
}

154
vendor/k8s.io/kubernetes/pkg/kubelet/util/util_unix.go generated vendored Normal file
View File

@ -0,0 +1,154 @@
// +build freebsd linux darwin
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package util
import (
"fmt"
"io/ioutil"
"net"
"net/url"
"os"
"path/filepath"
"time"
"golang.org/x/sys/unix"
"k8s.io/klog"
)
const (
// unixProtocol is the network protocol of unix socket.
unixProtocol = "unix"
)
// CreateListener creates a listener on the specified endpoint.
func CreateListener(endpoint string) (net.Listener, error) {
protocol, addr, err := parseEndpointWithFallbackProtocol(endpoint, unixProtocol)
if err != nil {
return nil, err
}
if protocol != unixProtocol {
return nil, fmt.Errorf("only support unix socket endpoint")
}
// Unlink to cleanup the previous socket file.
err = unix.Unlink(addr)
if err != nil && !os.IsNotExist(err) {
return nil, fmt.Errorf("failed to unlink socket file %q: %v", addr, err)
}
if err := os.MkdirAll(filepath.Dir(addr), 0750); err != nil {
return nil, fmt.Errorf("error creating socket directory %q: %v", filepath.Dir(addr), err)
}
// Create the socket on a tempfile and move it to the destination socket to handle improprer cleanup
file, err := ioutil.TempFile(filepath.Dir(addr), "")
if err != nil {
return nil, fmt.Errorf("failed to create temporary file: %v", err)
}
if err := os.Remove(file.Name()); err != nil {
return nil, fmt.Errorf("failed to remove temporary file: %v", err)
}
l, err := net.Listen(protocol, file.Name())
if err != nil {
return nil, err
}
if err = os.Rename(file.Name(), addr); err != nil {
return nil, fmt.Errorf("failed to move temporary file to addr %q: %v", addr, err)
}
return l, nil
}
// GetAddressAndDialer returns the address parsed from the given endpoint and a dialer.
func GetAddressAndDialer(endpoint string) (string, func(addr string, timeout time.Duration) (net.Conn, error), error) {
protocol, addr, err := parseEndpointWithFallbackProtocol(endpoint, unixProtocol)
if err != nil {
return "", nil, err
}
if protocol != unixProtocol {
return "", nil, fmt.Errorf("only support unix socket endpoint")
}
return addr, dial, nil
}
func dial(addr string, timeout time.Duration) (net.Conn, error) {
return net.DialTimeout(unixProtocol, addr, timeout)
}
func parseEndpointWithFallbackProtocol(endpoint string, fallbackProtocol string) (protocol string, addr string, err error) {
if protocol, addr, err = parseEndpoint(endpoint); err != nil && protocol == "" {
fallbackEndpoint := fallbackProtocol + "://" + endpoint
protocol, addr, err = parseEndpoint(fallbackEndpoint)
if err == nil {
klog.Warningf("Using %q as endpoint is deprecated, please consider using full url format %q.", endpoint, fallbackEndpoint)
}
}
return
}
func parseEndpoint(endpoint string) (string, string, error) {
u, err := url.Parse(endpoint)
if err != nil {
return "", "", err
}
switch u.Scheme {
case "tcp":
return "tcp", u.Host, nil
case "unix":
return "unix", u.Path, nil
case "":
return "", "", fmt.Errorf("using %q as endpoint is deprecated, please consider using full url format", endpoint)
default:
return u.Scheme, "", fmt.Errorf("protocol %q not supported", u.Scheme)
}
}
// LocalEndpoint returns the full path to a unix socket at the given endpoint
func LocalEndpoint(path, file string) (string, error) {
u := url.URL{
Scheme: unixProtocol,
Path: path,
}
return filepath.Join(u.String(), file+".sock"), nil
}
// IsUnixDomainSocket returns whether a given file is a AF_UNIX socket file
func IsUnixDomainSocket(filePath string) (bool, error) {
fi, err := os.Stat(filePath)
if err != nil {
return false, fmt.Errorf("stat file %s failed: %v", filePath, err)
}
if fi.Mode()&os.ModeSocket == 0 {
return false, nil
}
return true, nil
}
// NormalizePath is a no-op for Linux for now
func NormalizePath(path string) string {
return path
}

View File

@ -0,0 +1,54 @@
// +build !freebsd,!linux,!windows,!darwin
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package util
import (
"fmt"
"net"
"time"
)
// CreateListener creates a listener on the specified endpoint.
func CreateListener(endpoint string) (net.Listener, error) {
return nil, fmt.Errorf("CreateListener is unsupported in this build")
}
// GetAddressAndDialer returns the address parsed from the given endpoint and a dialer.
func GetAddressAndDialer(endpoint string) (string, func(addr string, timeout time.Duration) (net.Conn, error), error) {
return "", nil, fmt.Errorf("GetAddressAndDialer is unsupported in this build")
}
// LockAndCheckSubPath empty implementation
func LockAndCheckSubPath(volumePath, subPath string) ([]uintptr, error) {
return []uintptr{}, nil
}
// UnlockPath empty implementation
func UnlockPath(fileHandles []uintptr) {
}
// LocalEndpoint empty implementation
func LocalEndpoint(path, file string) (string, error) {
return "", fmt.Errorf("LocalEndpoints are unsupported in this build")
}
// GetBootTime empty implementation
func GetBootTime() (time.Time, error) {
return time.Time{}, fmt.Errorf("GetBootTime is unsupported in this build")
}

View File

@ -0,0 +1,153 @@
// +build windows
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package util
import (
"fmt"
"net"
"net/url"
"strings"
"syscall"
"time"
"github.com/Microsoft/go-winio"
)
const (
tcpProtocol = "tcp"
npipeProtocol = "npipe"
)
// CreateListener creates a listener on the specified endpoint.
func CreateListener(endpoint string) (net.Listener, error) {
protocol, addr, err := parseEndpoint(endpoint)
if err != nil {
return nil, err
}
switch protocol {
case tcpProtocol:
return net.Listen(tcpProtocol, addr)
case npipeProtocol:
return winio.ListenPipe(addr, nil)
default:
return nil, fmt.Errorf("only support tcp and npipe endpoint")
}
}
// GetAddressAndDialer returns the address parsed from the given endpoint and a dialer.
func GetAddressAndDialer(endpoint string) (string, func(addr string, timeout time.Duration) (net.Conn, error), error) {
protocol, addr, err := parseEndpoint(endpoint)
if err != nil {
return "", nil, err
}
if protocol == tcpProtocol {
return addr, tcpDial, nil
}
if protocol == npipeProtocol {
return addr, npipeDial, nil
}
return "", nil, fmt.Errorf("only support tcp and npipe endpoint")
}
func tcpDial(addr string, timeout time.Duration) (net.Conn, error) {
return net.DialTimeout(tcpProtocol, addr, timeout)
}
func npipeDial(addr string, timeout time.Duration) (net.Conn, error) {
return winio.DialPipe(addr, &timeout)
}
func parseEndpoint(endpoint string) (string, string, error) {
// url.Parse doesn't recognize \, so replace with / first.
endpoint = strings.Replace(endpoint, "\\", "/", -1)
u, err := url.Parse(endpoint)
if err != nil {
return "", "", err
}
if u.Scheme == "tcp" {
return "tcp", u.Host, nil
} else if u.Scheme == "npipe" {
if strings.HasPrefix(u.Path, "//./pipe") {
return "npipe", u.Path, nil
}
// fallback host if not provided.
host := u.Host
if host == "" {
host = "."
}
return "npipe", fmt.Sprintf("//%s%s", host, u.Path), nil
} else if u.Scheme == "" {
return "", "", fmt.Errorf("Using %q as endpoint is deprecated, please consider using full url format", endpoint)
} else {
return u.Scheme, "", fmt.Errorf("protocol %q not supported", u.Scheme)
}
}
// LocalEndpoint empty implementation
func LocalEndpoint(path, file string) (string, error) {
return "", fmt.Errorf("LocalEndpoints are unsupported in this build")
}
var tickCount = syscall.NewLazyDLL("kernel32.dll").NewProc("GetTickCount64")
// GetBootTime returns the time at which the machine was started, truncated to the nearest second
func GetBootTime() (time.Time, error) {
currentTime := time.Now()
output, _, err := tickCount.Call()
if errno, ok := err.(syscall.Errno); !ok || errno != 0 {
return time.Time{}, err
}
return currentTime.Add(-time.Duration(output) * time.Millisecond).Truncate(time.Second), nil
}
// IsUnixDomainSocket returns whether a given file is a AF_UNIX socket file
func IsUnixDomainSocket(filePath string) (bool, error) {
// Due to the absence of golang support for os.ModeSocket in Windows (https://github.com/golang/go/issues/33357)
// we need to dial the file and check if we receive an error to determine if a file is Unix Domain Socket file.
// Note that querrying for the Reparse Points (https://docs.microsoft.com/en-us/windows/win32/fileio/reparse-points)
// for the file (using FSCTL_GET_REPARSE_POINT) and checking for reparse tag: reparseTagSocket
// does NOT work in 1809 if the socket file is created within a bind mounted directory by a container
// and the FSCTL is issued in the host by the kubelet.
c, err := net.Dial("unix", filePath)
if err == nil {
c.Close()
return true, nil
}
return false, nil
}
// NormalizePath converts FS paths returned by certain go frameworks (like fsnotify)
// to native Windows paths that can be passed to Windows specific code
func NormalizePath(path string) string {
path = strings.ReplaceAll(path, "/", "\\")
if strings.HasPrefix(path, "\\") {
path = "c:" + path
}
return path
}

5
vendor/modules.txt vendored
View File

@ -150,7 +150,6 @@ github.com/docker/docker/api/types/time
github.com/docker/docker/api/types/versions
github.com/docker/docker/api/types/volume
github.com/docker/docker/client
github.com/docker/docker/errdefs
github.com/docker/docker/pkg/jsonmessage
github.com/docker/docker/pkg/term
github.com/docker/docker/pkg/term/windows
@ -912,6 +911,7 @@ k8s.io/component-base/metrics
k8s.io/component-base/metrics/legacyregistry
k8s.io/component-base/version
# k8s.io/cri-api v0.16.6 => k8s.io/cri-api v0.16.6
k8s.io/cri-api/pkg/apis
k8s.io/cri-api/pkg/apis/runtime/v1alpha2
# k8s.io/csi-translation-lib v0.16.6 => k8s.io/csi-translation-lib v0.16.6
k8s.io/csi-translation-lib
@ -971,7 +971,10 @@ k8s.io/kubernetes/pkg/credentialprovider
k8s.io/kubernetes/pkg/credentialprovider/secrets
k8s.io/kubernetes/pkg/features
k8s.io/kubernetes/pkg/fieldpath
k8s.io/kubernetes/pkg/kubelet/remote
k8s.io/kubernetes/pkg/kubelet/types
k8s.io/kubernetes/pkg/kubelet/util
k8s.io/kubernetes/pkg/kubelet/util/logreduction
k8s.io/kubernetes/pkg/master/ports
k8s.io/kubernetes/pkg/scheduler/algorithm
k8s.io/kubernetes/pkg/scheduler/algorithm/predicates