diff --git a/go.mod b/go.mod index fbf2ac1e5..630212615 100644 --- a/go.mod +++ b/go.mod @@ -13,6 +13,8 @@ require ( github.com/go-echarts/statsview v0.3.4 github.com/go-http-utils/headers v0.0.0-20181008091004-fed159eddc2a github.com/go-openapi/spec v0.20.3 // indirect + github.com/go-redis/cache/v8 v8.4.1 + github.com/go-redis/redis/v8 v8.9.0 github.com/gofrs/flock v0.8.0 github.com/golang/groupcache v0.0.0-20200121045136-8c9f03a8e57e github.com/golang/mock v1.5.0 @@ -30,7 +32,6 @@ require ( github.com/russross/blackfriday/v2 v2.1.0 // indirect github.com/satori/go.uuid v1.2.0 // indirect github.com/serialx/hashring v0.0.0-20200727003509-22c0c7ab6b1b - github.com/shirou/gopsutil/v3 v3.21.4 github.com/sirupsen/logrus v1.2.0 github.com/spf13/afero v1.2.2 // indirect github.com/spf13/cobra v1.1.1 @@ -39,8 +40,6 @@ require ( github.com/swaggo/files v0.0.0-20190704085106-630677cd5c14 github.com/swaggo/gin-swagger v1.3.0 github.com/swaggo/swag v1.7.0 - github.com/valyala/fasthttp v1.22.0 - github.com/willf/bitset v1.1.11 github.com/xo/dburl v0.7.0 go.opentelemetry.io/otel v0.20.0 go.opentelemetry.io/otel/exporters/trace/jaeger v0.20.0 @@ -48,7 +47,7 @@ require ( go.opentelemetry.io/otel/trace v0.20.0 go.uber.org/atomic v1.6.0 go.uber.org/zap v1.16.0 - golang.org/x/crypto v0.0.0-20210322153248-0c34fe9e7dc2 + golang.org/x/crypto v0.0.0-20210322153248-0c34fe9e7dc2 // indirect golang.org/x/sync v0.0.0-20210220032951-036812b2e83c golang.org/x/sys v0.0.0-20210309074719-68d13333faf2 // indirect golang.org/x/term v0.0.0-20201210144234-2321bbc49cbf diff --git a/go.sum b/go.sum index 7986926f7..ffbab14a7 100644 --- a/go.sum +++ b/go.sum @@ -11,6 +11,7 @@ cloud.google.com/go/firestore v1.1.0/go.mod h1:ulACoGHTpvq5r8rxGJ4ddJZBZqakUQqCl cloud.google.com/go/pubsub v1.0.1/go.mod h1:R0Gpsv3s54REJCy4fxDixWD93lHJMoZTyQ2kNxGRt3I= cloud.google.com/go/storage v1.0.0/go.mod h1:IhtSnM/ZTZV8YYJWCY8RULGVqBDmpoyjwiyrjsg+URw= dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU= +dmitri.shuralyov.com/gpu/mtl v0.0.0-20201218220906-28db891af037/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU= github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo= @@ -25,16 +26,12 @@ github.com/PuerkitoBio/purell v1.1.1 h1:WEQqlqaGbrPkxLJWfBwQmfEAE1Z7ONdDLqrN38tN github.com/PuerkitoBio/purell v1.1.1/go.mod h1:c11w/QuzBsJSee3cPx9rAFu61PvFxuPbtSwDGJws/X0= github.com/PuerkitoBio/urlesc v0.0.0-20170810143723-de5bf2ad4578 h1:d+Bc7a5rLufV/sSk/8dngufqelfh6jnri85riMAaF/M= github.com/PuerkitoBio/urlesc v0.0.0-20170810143723-de5bf2ad4578/go.mod h1:uGdkoq3SwY9Y+13GIhn11/XLaGBb4BfwItxLd5jeuXE= -github.com/StackExchange/wmi v0.0.0-20190523213315-cbe66965904d h1:G0m3OIz70MZUWq3EgK3CesDbo8upS2Vm9/P3FtgI+Jk= -github.com/StackExchange/wmi v0.0.0-20190523213315-cbe66965904d/go.mod h1:3eOhrUMpNV+6aFIbp5/iudMxNCF27Vw2OZgy4xEx0Fg= github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751 h1:JYp7IbQjafoB+tBA3gMyHYHrpOtNuDiK/uB5uXxq5wM= github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0= github.com/aliyun/aliyun-oss-go-sdk v2.1.6+incompatible h1:Ft+KeWIJxFP76LqgJbvtOA1qBIoC8vGkTV3QeCOeJC4= github.com/aliyun/aliyun-oss-go-sdk v2.1.6+incompatible/go.mod h1:T/Aws4fEfogEE9v+HPhhw+CntffsBHJ8nXQCwKr0/g8= -github.com/andybalholm/brotli v1.0.1 h1:KqhlKozYbRtJvsPrrEeXcO+N2l6NYT5A2QAFmSULpEc= -github.com/andybalholm/brotli v1.0.1/go.mod h1:loMXtMfwqflxFJPmdbJO0a3KNoPuLBgiu3qAvBg8x/Y= github.com/armon/circbuf v0.0.0-20150827004946-bbbad097214e/go.mod h1:3U/XgcO3hCbHZ8TKRvWD2dDTCfh9M9ya+I9JpbB7O8o= github.com/armon/go-metrics v0.0.0-20180917152333-f0300d1749da/go.mod h1:Q73ZrmVTwzkszR9V5SSuryQ31EELlFMUz1kKyl939pY= github.com/armon/go-radix v0.0.0-20180808171621-7fddfc383310/go.mod h1:ufUuZ+zHj4x4TnLV4JWEpy2hxWSpsRywHrMgIH9cCH8= @@ -46,7 +43,10 @@ github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+Ce github.com/bgentry/speakeasy v0.1.0/go.mod h1:+zsyZBPWlz7T6j88CTgSN5bM796AkVf0kBD4zp0CCIs= github.com/bketelsen/crypt v0.0.3-0.20200106085610-5cbc8cc4026c/go.mod h1:MKsuJmJgSg28kpZDP6UIiPt0e0Oz0kqKNGyRaWEPv84= github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= +github.com/cespare/xxhash v1.1.0 h1:a6HrQnmkObjyL+Gs60czilIUGqrzKutQD6XZog3p+ko= github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghfAqPWnc= +github.com/cespare/xxhash/v2 v2.1.1 h1:6MnRN8NT7+YBpUIWxHtefFZOKTAPgGjpQSxqLNn0+qY= +github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw= github.com/cncf/udpa/go v0.0.0-20201120205902-5459f2c99403/go.mod h1:WmhPx2Nbnhtbo57+VJT5O0JRkEi1Wbu0z5j0R8u5Hbk= github.com/coreos/bbolt v1.3.2/go.mod h1:iRUV2dpdMOn7Bo10OQBFzIJO9kkE559Wcmn+qkEiiKk= @@ -62,6 +62,8 @@ github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSs github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod h1:E3ru+11k8xSBh+hMPgOLZmtrrCbhqsmaPHjLKYnJCaQ= +github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78= +github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc= github.com/dgryski/go-sip13 v0.0.0-20181026042036-e10d5fee7954/go.mod h1:vAd38F8PWV+bWy6jNmig1y/TA+kYO4g3RSRF0IAv0no= github.com/docker/go-units v0.4.0 h1:3uh0PgVws3nIA0Q+MwDC8yjEPf9zjRfZZWXZYDct3Tw= github.com/docker/go-units v0.4.0/go.mod h1:fgPhTUdO+D/Jk86RDLlptpiXQzgHJF7gydDDbaIK4Dk= @@ -97,6 +99,7 @@ github.com/go-echarts/go-echarts/v2 v2.2.3/go.mod h1:6TOomEztzGDVDkOSCFBq3ed7xOY github.com/go-echarts/statsview v0.3.4 h1:CCuytRAutdnF901NrR4BzSjHXjUp8OyA3/iopgG/1/Y= github.com/go-echarts/statsview v0.3.4/go.mod h1:AehKjL9cTFMeIo5QdV8sQO43vFmfY65X5GMWa3XMciY= github.com/go-gl/glfw v0.0.0-20190409004039-e6da0acd62b1/go.mod h1:vR7hzQXu2zJy9AVAgeJqvqgH9Q5CA+iKCZ2gyEVpxRU= +github.com/go-gl/glfw/v3.3/glfw v0.0.0-20200222043503-6f7a984d4dc4/go.mod h1:tQ2UAYgL5IevRw8kRxooKSPJfGvJ9fJQFa0TUsXzTg8= github.com/go-http-utils/headers v0.0.0-20181008091004-fed159eddc2a h1:v6zMvHuY9yue4+QkG/HQ/W67wvtQmWJ4SDo9aK/GIno= github.com/go-http-utils/headers v0.0.0-20181008091004-fed159eddc2a/go.mod h1:I79BieaU4fxrw4LMXby6q5OS9XnoR9UIKLOzDFjUmuw= github.com/go-kit/kit v0.8.0/go.mod h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2as= @@ -105,8 +108,6 @@ github.com/go-logfmt/logfmt v0.4.0/go.mod h1:3RMwSq7FuexP4Kalkev3ejPJsZTpXXBr9+V github.com/go-logr/logr v0.1.0/go.mod h1:ixOQHD9gLJUVQQ2ZOR7zLEifBX6tGkNJF4QyIY7sIas= github.com/go-logr/logr v0.2.0 h1:QvGt2nLcHH0WK9orKa+ppBPAxREcH364nPUedEpK0TY= github.com/go-logr/logr v0.2.0/go.mod h1:z6/tIYblkpsD+a4lm/fGIIU9mZ+XfAiaFtq7xTgseGU= -github.com/go-ole/go-ole v1.2.4 h1:nNBDSCOigTSiarFpYE9J/KtEA1IOW4CNeqT9TQDqCxI= -github.com/go-ole/go-ole v1.2.4/go.mod h1:XCwSNxSkXRo4vlyPy93sltvi/qJq0jqQhjqQNIwKuxM= github.com/go-openapi/jsonpointer v0.17.0/go.mod h1:cOnomiV+CVVwFLk0A/MExoFMjwdsUdVpsRhURCKh+3M= github.com/go-openapi/jsonpointer v0.19.2/go.mod h1:3akKfEdA7DF1sugOqz1dVQHBcuDBPKZGEoHC/NkiQRg= github.com/go-openapi/jsonpointer v0.19.3/go.mod h1:Pl9vOtqEWErmShwVjC8pYs9cog34VGT37dQOVbmoatg= @@ -138,6 +139,11 @@ github.com/go-playground/universal-translator v0.17.0 h1:icxd5fm+REJzpZx7ZfpaD87 github.com/go-playground/universal-translator v0.17.0/go.mod h1:UkSxE5sNxxRwHyU+Scu5vgOQjsIJAF8j9muTVoKLVtA= github.com/go-playground/validator/v10 v10.2.0 h1:KgJ0snyC2R9VXYN2rneOtQcw5aHQB1Vv0sFl1UcHBOY= github.com/go-playground/validator/v10 v10.2.0/go.mod h1:uOYAAleCW8F/7oMFd6aG0GOhaH6EGOAJShg8Id5JGkI= +github.com/go-redis/cache/v8 v8.4.1 h1:jq0fw7hcUJzsoS9qPYjXzcAT/rm19TRMB1vEMG3m950= +github.com/go-redis/cache/v8 v8.4.1/go.mod h1:iyYQNUxMsz6cPfTX3h4sT4lUmDXV0mDuEyeAn2o1btI= +github.com/go-redis/redis/v8 v8.4.4/go.mod h1:nA0bQuF0i5JFx4Ta9RZxGKXFrQ8cRWntra97f0196iY= +github.com/go-redis/redis/v8 v8.9.0 h1:FTTbB7WqlXfVNdVv0SsxA+oVi0bAwit6bMe3IUucq2o= +github.com/go-redis/redis/v8 v8.9.0/go.mod h1:ik7vb7+gm8Izylxu6kf6wG26/t2VljgCfSQ1DM4O1uU= github.com/go-sql-driver/mysql v1.5.0 h1:ozyZYNQW3x3HtqT1jira07DN2PArx2v7/mN66gGcHOs= github.com/go-sql-driver/mysql v1.5.0/go.mod h1:DCzpHaOWr8IXmIStZouvnhqoel9Qv2LBy8hT2VhHyBg= github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY= @@ -168,6 +174,8 @@ github.com/golang/protobuf v1.4.1/go.mod h1:U8fpvMrcmy5pZrNK1lt4xCsGvpyWQ/VVv6QD github.com/golang/protobuf v1.4.2/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI= github.com/golang/protobuf v1.4.3 h1:JjCZWpVbqXDqFVmTfYWEVTMIYrL/NPdPSCHPJ0T/raM= github.com/golang/protobuf v1.4.3/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI= +github.com/golang/snappy v0.0.3 h1:fHPg5GQYlCeLIPB9BZqMVR5nR9A+IM5zcgeTdjMYmLA= +github.com/golang/snappy v0.0.3/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ= github.com/google/btree v1.0.0/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ= github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M= @@ -176,6 +184,7 @@ github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMyw github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.2/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/go-cmp v0.5.4/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.5 h1:Khx7svrCpmxxtHBq5j2mp/xVjsi8hQMfNLvJFAlrGgU= github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= @@ -243,8 +252,8 @@ github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7V github.com/kisielk/errcheck v1.1.0/go.mod h1:EZBBE59ingxPouuu3KfxchcWSUPOHkagtvWXihfKN4Q= github.com/kisielk/errcheck v1.2.0/go.mod h1:/BMXB+zMLi60iA8Vv6Ksmxu/1UDYcXs4uQLJ+jE2L00= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= -github.com/klauspost/compress v1.11.8 h1:difgzQsp5mdAz9v8lm3P/I+EpDKMU/6uTMw1y1FObuo= -github.com/klauspost/compress v1.11.8/go.mod h1:aoV0uJVorq1K+umq18yTdKaF57EivdYsUV+/s2qKfXs= +github.com/klauspost/compress v1.12.2 h1:2KCfW3I9M7nSc5wOqXAlW2v2U6v+w6cbjvbfp+OykW8= +github.com/klauspost/compress v1.12.2/go.mod h1:8dP1Hq4DHOhN9w426knH3Rhby4rFm6D8eO+e+Dq5Gzg= github.com/konsorten/go-windows-terminal-sequences v1.0.1 h1:mweAR1A6xJ3oS2pRaGiHgQ4OO8tzTaLawm8vnODuwDk= github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515/go.mod h1:+0opPa2QZZtGFBFZlji/RkVcI2GknAs/DXo4wKdlNEc= @@ -300,14 +309,25 @@ github.com/munnerz/goautoneg v0.0.0-20120707110453-a547fc61f48d/go.mod h1:+n7T8m github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U= github.com/mxk/go-flowrate v0.0.0-20140419014527-cca7078d478f/go.mod h1:ZdcZmHo+o7JKHSa8/e818NopupXU1YMK5fe1lsApnBw= github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno= +github.com/nxadm/tail v1.4.4 h1:DQuhQpB1tVlglWS2hLQ5OV6B5r8aGxSrPc5Qo6uTN78= +github.com/nxadm/tail v1.4.4/go.mod h1:kenIhsEOeOJmVchQTgglprH7qJGnHDVpk1VPCcaMI8A= github.com/oklog/ulid v1.3.1/go.mod h1:CirwcVhetQ6Lv90oh/F+FBtV6XMibvdAFo93nm5qn4U= github.com/olekukonko/tablewriter v0.0.5 h1:P2Ga83D34wi1o9J6Wh1mRuqd4mF/x/lgBS7N7AbDhec= github.com/olekukonko/tablewriter v0.0.5/go.mod h1:hPp6KlRPjbx+hW8ykQs1w3UBbZlj6HuIJcUGPhkA7kY= github.com/onsi/ginkgo v0.0.0-20170829012221-11459a886d9c/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= github.com/onsi/ginkgo v1.11.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= +github.com/onsi/ginkgo v1.12.1/go.mod h1:zj2OWP4+oCPe1qIXoGWkgMRwljMUYCdkwsT2108oapk= +github.com/onsi/ginkgo v1.14.2/go.mod h1:iSB4RoI2tjJc9BBv4NKIKWKya62Rps+oPG/Lv9klQyY= +github.com/onsi/ginkgo v1.15.0 h1:1V1NfVQR87RtWAgp1lv9JZJ5Jap+XFGKPi00andXGi4= +github.com/onsi/ginkgo v1.15.0/go.mod h1:hF8qUzuuC8DJGygJH3726JnCZX4MYbRB8yFfISqnKUg= github.com/onsi/gomega v0.0.0-20170829124025-dcabb60a477c/go.mod h1:C1qb7wdrVGGVU+Z6iS04AVkA3Q65CEZX59MT0QO5uiA= github.com/onsi/gomega v1.7.0/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY= +github.com/onsi/gomega v1.7.1/go.mod h1:XdKZgCCFLUoM/7CFJVPcG8C1xQ1AJ0vpAezJrB7JYyY= +github.com/onsi/gomega v1.10.1/go.mod h1:iN09h71vgCQne3DLsj+A5owkum+a2tYe+TOCB1ybHNo= +github.com/onsi/gomega v1.10.4/go.mod h1:g/HbgYopi++010VEqkFgJHKC09uJiW9UkXvMUuKHUCQ= +github.com/onsi/gomega v1.10.5 h1:7n6FEkpFmfCoo2t+YYqXH0evK+a9ICQz0xcAy9dYcaQ= +github.com/onsi/gomega v1.10.5/go.mod h1:gza4q3jKQJijlu05nKWRCW/GavJumGt8aNRxWg7mt48= github.com/pascaldekloe/goe v0.0.0-20180627143212-57f6aae5913c/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc= github.com/pborman/uuid v1.2.1 h1:+ZZIw58t/ozdjRaXh/3awHfmWRbzYxJoAdNJxe/3pvw= github.com/pborman/uuid v1.2.1/go.mod h1:X/NO0urCmaxf9VXbdlT7C2Yzkj2IKimNn4k+gtPdI/k= @@ -345,8 +365,6 @@ github.com/satori/go.uuid v1.2.0/go.mod h1:dA0hQrYB0VpLJoorglMZABFdXlWrHn1NEOzdh github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529/go.mod h1:DxrIzT+xaE7yg65j358z/aeFdxmN0P9QXhEzd20vsDc= github.com/serialx/hashring v0.0.0-20200727003509-22c0c7ab6b1b h1:h+3JX2VoWTFuyQEo87pStk/a99dzIO1mM9KxIyLPGTU= github.com/serialx/hashring v0.0.0-20200727003509-22c0c7ab6b1b/go.mod h1:/yeG0My1xr/u+HZrFQ1tOQQQQrOawfyMUH13ai5brBc= -github.com/shirou/gopsutil/v3 v3.21.4 h1:XB/+p+kVnyYLuPHCfa99lxz2aJyvVhnyd+FxZqH/k7M= -github.com/shirou/gopsutil/v3 v3.21.4/go.mod h1:ghfMypLDrFSWN2c9cDYFLHyynQ+QUht0cv/18ZqVczw= github.com/shurcooL/sanitized_anchor_name v1.0.0/go.mod h1:1NzhyTcUVG4SuEtjjoZeVRXNmyL/1OwPU0+IJeTBvfc= github.com/sirupsen/logrus v1.2.0 h1:juTguoYk5qI21pwyTXY3B3Y5cOTH3ZUyZCg1v/mihuo= github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo= @@ -393,10 +411,6 @@ github.com/swaggo/gin-swagger v1.3.0/go.mod h1:oy1BRA6WvgtCp848lhxce7BnWH4C8Bxa0 github.com/swaggo/swag v1.5.1/go.mod h1:1Bl9F/ZBpVWh22nY0zmYyASPO1lI/zIwRDrpZU+tv8Y= github.com/swaggo/swag v1.7.0 h1:5bCA/MTLQoIqDXXyHfOpMeDvL9j68OY/udlK4pQoo4E= github.com/swaggo/swag v1.7.0/go.mod h1:BdPIL73gvS9NBsdi7M1JOxLvlbfvNRaBP8m6WT6Aajo= -github.com/tklauser/go-sysconf v0.3.4 h1:HT8SVixZd3IzLdfs/xlpq0jeSfTX57g1v6wB1EuzV7M= -github.com/tklauser/go-sysconf v0.3.4/go.mod h1:Cl2c8ZRWfHD5IrfHo9VN+FX9kCFjIOyVklgXycLB6ek= -github.com/tklauser/numcpus v0.2.1 h1:ct88eFm+Q7m2ZfXJdan1xYoXKlmwsfP+k88q05KvlZc= -github.com/tklauser/numcpus v0.2.1/go.mod h1:9aU+wOc6WjUIZEwWMP62PL/41d65P+iks1gBkr4QyP8= github.com/tmc/grpc-websocket-proxy v0.0.0-20190109142713-0ad062ec5ee5/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U= github.com/ugorji/go v1.1.4/go.mod h1:uQMGLiO92mf5W77hV/PUCpI3pbzQx3CRekS0kk+RGrc= github.com/ugorji/go v1.1.7/go.mod h1:kZn38zHttfInRq0xu/PH0az30d+z6vm202qpg1oXVMw= @@ -408,13 +422,14 @@ github.com/ugorji/go/codec v1.1.13 h1:013LbFhocBoIqgHeIHKlV4JWYhqogATYWZhIcH0WHn github.com/ugorji/go/codec v1.1.13/go.mod h1:oNVt3Dq+FO91WNQ/9JnHKQP2QJxTzoN7wCBFCq1OeuU= github.com/urfave/cli v1.20.0/go.mod h1:70zkFmudgCuE/ngEzBv17Jvp/497gISqfk5gWijbERA= github.com/urfave/cli/v2 v2.3.0/go.mod h1:LJmUH05zAU44vOAcrfzZQKsZbVcdbOG8rtL3/XcUArI= -github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6KllzawFIhcdPw= -github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc= -github.com/valyala/fasthttp v1.22.0 h1:OpwH5KDOJ9cS2bq8fD+KfT4IrksK0llvkHf4MZx42jQ= -github.com/valyala/fasthttp v1.22.0/go.mod h1:0mw2RjXGOzxf4NL2jni3gUQ7LfjjUSiG5sskOUUSEpU= -github.com/valyala/tcplisten v0.0.0-20161114210144-ceec8f93295a/go.mod h1:v3UYOV9WzVtRmSR+PDvWpU/qWl4Wa5LApYYX4ZtKbio= -github.com/willf/bitset v1.1.11 h1:N7Z7E9UvjW+sGsEl7k/SJrvY2reP1A07MrGuCjIOjRE= -github.com/willf/bitset v1.1.11/go.mod h1:83CECat5yLh5zVOf4P1ErAgKA5UDvKtgyUABdr3+MjI= +github.com/vmihailenco/bufpool v0.1.11 h1:gOq2WmBrq0i2yW5QJ16ykccQ4wH9UyEsgLm6czKAd94= +github.com/vmihailenco/bufpool v0.1.11/go.mod h1:AFf/MOy3l2CFTKbxwt0mp2MwnqjNEs5H/UxrkA5jxTQ= +github.com/vmihailenco/go-tinylfu v0.2.0 h1:gRe/WurdOHaNrayn1anyWOgLkeC8xf0234kyLvkQWxM= +github.com/vmihailenco/go-tinylfu v0.2.0/go.mod h1:BLjA2pesPf7BH0jjFgrgB9uEgekHC4p9i8378iKOvdk= +github.com/vmihailenco/msgpack/v5 v5.1.0 h1:+od5YbEXxW95SPlW6beocmt8nOtlh83zqat5Ip9Hwdc= +github.com/vmihailenco/msgpack/v5 v5.1.0/go.mod h1:C5gboKD0TJPqWDTVTtrQNfRbiBwHZGo8UTqP/9/XvLI= +github.com/vmihailenco/tagparser v0.1.2 h1:gnjoVuB/kljJ5wICEEOpx98oXMWPLj22G67Vbd1qPqc= +github.com/vmihailenco/tagparser v0.1.2/go.mod h1:OeAg3pn3UbLjkWt+rN9oFYB6u/cQgqMEUPoW2WPyhdI= github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2/go.mod h1:UETIi67q53MR2AWcXfiuqkDkRtnGDLqkBTpCHuJHxtU= github.com/xo/dburl v0.7.0 h1:sGcxE2sefO6yJwX99EDzZCU0AACvWUKEqT4eIHFDPmY= github.com/xo/dburl v0.7.0/go.mod h1:W68zXnBfTb4zcKLI1yEYRyYIQjcjoyCRn4YMD/QzcpE= @@ -422,6 +437,7 @@ github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9dec go.etcd.io/bbolt v1.3.2/go.mod h1:IbVyRI1SCnLcuJnV2u8VeU0CEYM7e686BmAb1XKL+uU= go.opencensus.io v0.21.0/go.mod h1:mSImk1erAIZhrmZN+AvHh14ztQfjbGwt4TtuofqLduU= go.opencensus.io v0.22.0/go.mod h1:+kGneAE2xo2IficOXnaByMWTGM9T73dGwxeWcUqIpI8= +go.opentelemetry.io/otel v0.15.0/go.mod h1:e4GKElweB8W2gWUqbghw0B8t5MCTccc9212eNHnOHwA= go.opentelemetry.io/otel v0.20.0 h1:eaP0Fqu7SXHwvjiqDq83zImeehOHX8doTvU9AwXON8g= go.opentelemetry.io/otel v0.20.0/go.mod h1:Y3ugLH2oa81t5QO+Lty+zXf8zC9L26ax4Nzoxm/dooo= go.opentelemetry.io/otel/exporters/trace/jaeger v0.20.0 h1:FoclOadJNul1vUiKnZU0sKFWOZtZQq3jUzSbrX2jwNM= @@ -453,14 +469,16 @@ golang.org/x/crypto v0.0.0-20190605123033-f99c8df09eb5/go.mod h1:yigFU9vqHzYiE8U golang.org/x/crypto v0.0.0-20190611184440-5c40567a22f8/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= -golang.org/x/crypto v0.0.0-20210220033148-5ea612d1eb83/go.mod h1:jdWPYTVW3xRLrWPugEBEK3UY2ZEsg3UU495nc5E+M+I= golang.org/x/crypto v0.0.0-20210322153248-0c34fe9e7dc2 h1:It14KIkyBFYkHkwZ7k45minvA9aorojkyjGk9KJ5B/w= golang.org/x/crypto v0.0.0-20210322153248-0c34fe9e7dc2/go.mod h1:T9bdIzuCu7OtxOm1hfPfRQxPLYneinmdGuTeoZ9dtd4= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20190306152737-a1d7652674e8/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20190510132918-efd6b22b2522/go.mod h1:ZjyILWgesfNpC6sMxTJOJm9Kp84zZh5NQWvqDGG3Qr8= +golang.org/x/exp v0.0.0-20190731235908-ec7cb31e5a56/go.mod h1:JhuoJpWY28nO4Vef9tZUw9qufEGTyX1+7lmHxV5q5G4= golang.org/x/exp v0.0.0-20190829153037-c13cbed26979/go.mod h1:86+5VVa7VpoJ4kLfm080zCjGlMRFzhUhsZKEZO7MGek= golang.org/x/exp v0.0.0-20191030013958-a1ab85dbe136/go.mod h1:JXzH8nQsPlswgeRAPE3MuO9GYsAcnJvJ4vnMwN/5qkY= +golang.org/x/exp v0.0.0-20201221025956-e89b829e73ea h1:GnGfrp0fiNhiBS/v/aCFTmfEWgkvxW4Qiu8oM2/IfZ4= +golang.org/x/exp v0.0.0-20201221025956-e89b829e73ea/go.mod h1:I6l2HNBLBZEcrOoCpyKLdY2lHoRZ8lI4x60KMCQDft4= golang.org/x/image v0.0.0-20190227222117-0694c2d4d067/go.mod h1:kZ7UVZpmo3dzQBMxlp+ypCbDeSB+sBbTgSJuh5dn5js= golang.org/x/image v0.0.0-20190802002840-cff245a6509b/go.mod h1:FeLwcggjj3mMvU+oOTbSwawSJRM1uh48EjtB4UJZlP0= golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE= @@ -473,10 +491,15 @@ golang.org/x/lint v0.0.0-20190930215403-16217165b5de h1:5hukYrvBGR8/eNkX5mdUezrA golang.org/x/lint v0.0.0-20190930215403-16217165b5de/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc= golang.org/x/mobile v0.0.0-20190312151609-d3739f865fa6/go.mod h1:z+o9i4GpDbdi3rU15maQ/Ox0txvL9dWGYEHz965HBQE= golang.org/x/mobile v0.0.0-20190719004257-d2bd2a29d028/go.mod h1:E/iHnbuqvinMTCcRqshq8CkpyQDoeVncDDYHnLhea+o= +golang.org/x/mobile v0.0.0-20201217150744-e6ae53a27f4f/go.mod h1:skQtrUTUwhdJvXM/2KKJzY8pDgNr9I/FOMqDVRPBUS4= golang.org/x/mod v0.0.0-20190513183733-4bf6d317e70e/go.mod h1:mXi4GBBbnImb6dmsKGUJ2LatrhH/nqhxcFungHvyanc= golang.org/x/mod v0.1.0/go.mod h1:0QHyrYULN0/3qlju5TqG8bIK38QM8yzMo5ekMj3DlcY= +golang.org/x/mod v0.1.1-0.20191105210325-c90efee705ee/go.mod h1:QqPTAvyqsEbceGzBzNggFXnrqF1CaUcvgkdR5Ot7KZg= +golang.org/x/mod v0.1.1-0.20191209134235-331c550502dd/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.3.0 h1:RM4zey1++hCTbCVQfnWeKs9/IEsaBLA8vTkd0WVtmH4= golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= +golang.org/x/mod v0.3.1-0.20200828183125-ce943fd02449 h1:xUIPaMhvROX9dhPvRCenIJtU78+lbEenGbgqB5hfHCQ= +golang.org/x/mod v0.3.1-0.20200828183125-ce943fd02449/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= @@ -497,10 +520,11 @@ golang.org/x/net v0.0.0-20190613194153-d28f0bde5980/go.mod h1:z5CRVTTTmAJ677TzLL golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20190827160401-ba9fcec4b297/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20200324143707-d3edc9973b7e/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A= +golang.org/x/net v0.0.0-20200520004742-59133d7f0dd7/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A= golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= golang.org/x/net v0.0.0-20201110031124-69a78807bb2b/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= +golang.org/x/net v0.0.0-20201202161906-c7110b5ffcbb/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= golang.org/x/net v0.0.0-20210119194325-5f4716e94777/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= -golang.org/x/net v0.0.0-20210226101413-39120d07d75e/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= golang.org/x/net v0.0.0-20210226172049-e18ecbb05110 h1:qWPm9rbaAMKs8Bq/9LRpbMqxWRVUAQwMI9fVrssnTfw= golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= @@ -512,6 +536,7 @@ golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sync v0.0.0-20190227155943-e225da77a7e6/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20210220032951-036812b2e83c h1:5KslGYwFpkhGh+Q16bwMP3cOontH8FOep7tGV86Y7SQ= golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20180823144017-11551d06cbcc/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= @@ -532,18 +557,19 @@ golang.org/x/sys v0.0.0-20190606165138-5da285871e9c/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20190610200419-93c9922d18ae/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190616124812-15dcb6c0061f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190624142023-c5567b49c5d0/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20190904154756-749cb33beabd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20191001151750-bb3f8db39f24/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20191005200804-aed5e4c7ecf9/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.0.0-20191026070338-33540a1f6037/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20191120155948-bd437916bb0e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200116001909-b77594299b42/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200323222414-85ca7c5b95cd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20200519105757-fe76b779f299/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20201112073958-5cba982894dd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.0.0-20210217105451-b926d437f341/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.0.0-20210225134936-a50acf3fe073/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210112080510-489259a85091/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210309074719-68d13333faf2 h1:46ULzRKLh1CwgRq2dC5SlBzEqqNCi8rreOZnNrbqcIY= golang.org/x/sys v0.0.0-20210309074719-68d13333faf2/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/term v0.0.0-20201117132131-f5c789dd3221/go.mod h1:Nr5EML6q2oocZ2LXRh80K7BxOlk5/8JxuGnuhpl+muw= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20201210144234-2321bbc49cbf h1:MZ2shdL+ZM/XzY3ZGOnh4Nlpnxz5GSOhOmtHo3iPU6M= golang.org/x/term v0.0.0-20201210144234-2321bbc49cbf/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= @@ -583,8 +609,12 @@ golang.org/x/tools v0.0.0-20191029041327-9cc4af7d6b2c/go.mod h1:b+2E5dAYhXwXZwtn golang.org/x/tools v0.0.0-20191029190741-b9c20aec41a5/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/tools v0.0.0-20191112195655-aa38f8e97acc/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= +golang.org/x/tools v0.0.0-20200117012304-6edc0a871e69/go.mod h1:TB2adYChydJhpapKDTa4BR/hXlZSLoq2Wpct/0txZ28= +golang.org/x/tools v0.0.0-20200207183749-b753a1ba74fa/go.mod h1:TB2adYChydJhpapKDTa4BR/hXlZSLoq2Wpct/0txZ28= golang.org/x/tools v0.0.0-20201120155355-20be4ac4bd6e h1:t96dS3DO8DGjawSLJL/HIdz8CycAd2v07XxqB3UPTi0= golang.org/x/tools v0.0.0-20201120155355-20be4ac4bd6e/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= +golang.org/x/tools v0.0.0-20201224043029-2b0845dc783e h1:4nW4NLDYnU28ojHaHO8OVxFHk/aQ33U01a9cjED+pzE= +golang.org/x/tools v0.0.0-20201224043029-2b0845dc783e/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= @@ -646,6 +676,7 @@ gopkg.in/ini.v1 v1.51.0/go.mod h1:pNLf8WUiyNEtQjuu5G5vTm06TEv9tsIgeAvK8hOrP4k= gopkg.in/natefinch/lumberjack.v2 v2.0.0 h1:1Lc07Kr7qY4U2YPouBjpCLxpiyxIVoxqXgkXLknAOE8= gopkg.in/natefinch/lumberjack.v2 v2.0.0/go.mod h1:l0ndWWf7gzL7RNwBG7wST/UCcT4T24xpD6X8LsfU/+k= gopkg.in/resty.v1 v1.12.0/go.mod h1:mDo4pnntr5jdWRML875a/NmxYqAlA73dVijT2AXvQQo= +gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkepLTh2hOroT7a+7czfdQ= gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw= gopkg.in/yaml.v2 v2.0.0-20170812160011-eb3733d160e7/go.mod h1:JAlM8MvJe8wmxCU4Bli9HhUf9+ttbYbLASfIpnQbh74= gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= diff --git a/manager/config/config.go b/manager/config/config.go index 6391ee35d..05e7751b9 100644 --- a/manager/config/config.go +++ b/manager/config/config.go @@ -10,6 +10,7 @@ type Config struct { base.Options `yaml:",inline" mapstructure:",squash"` Server *ServerConfig `yaml:"server" mapstructure:"server"` Configure *ConfigureConfig `yaml:"configure" mapstructure:"configure"` + Redis *RedisConfig `yaml:"redis" mapstructure:"redis"` Stores []*StoreConfig `yaml:"stores" mapstructure:"stores"` HostService *HostService `yaml:"host-service" mapstructure:"host-service"` } @@ -26,8 +27,7 @@ type ConfigureConfig struct { type MysqlConfig struct { User string `yaml:"user" mapstructure:"user"` Password string `yaml:"password" mapstructure:"password"` - IP string `yaml:"ip" mapstructure:"ip"` - Port int `yaml:"port" mapstructure:"port"` + Addr string `yaml:"addr" mapstructure:"addr"` Db string `yaml:"db" mapstructure:"db"` } @@ -44,6 +44,12 @@ type StoreConfig struct { type HostService struct { } +type RedisConfig struct { + User string `yaml:"user" mapstructure:"user"` + Password string `yaml:"password" mapstructure:"password"` + Addrs []string `yaml:"addr" mapstructure:"addrs"` +} + type SkylineService struct { Domain string `yaml:"domain" mapstructure:"domain"` AppName string `yaml:"app-name" mapstructure:"app-name"` @@ -59,6 +65,11 @@ func New() *Config { Configure: &ConfigureConfig{ StoreName: "store1", }, + Redis: &RedisConfig{ + User: "", + Password: "", + Addrs: []string{"127.0.0.1:6379"}, + }, Stores: []*StoreConfig{ { Name: "store1", @@ -66,9 +77,8 @@ func New() *Config { Mysql: &MysqlConfig{ User: "root", Password: "root1234", - IP: "127.0.0.1", - Port: 3306, - Db: "config_db", + Addr: "127.0.0.1:3306", + Db: "dragonfly_manager", }, Oss: nil, }, @@ -91,12 +101,8 @@ func (cfg *StoreConfig) Valid() error { return dferrors.Newf(dfcodes.ManagerConfigError, "store config error: Mysql.Password is null") } - if cfg.Mysql.Port == 0 { - return dferrors.Newf(dfcodes.ManagerConfigError, "store config error: Mysql.Port is null") - } - - if len(cfg.Mysql.IP) == 0 { - return dferrors.Newf(dfcodes.ManagerConfigError, "store config error: Mysql.IP is null") + if len(cfg.Mysql.Addr) == 0 { + return dferrors.Newf(dfcodes.ManagerConfigError, "store config error: Mysql.Addr is null") } if len(cfg.Mysql.Db) == 0 { @@ -113,6 +119,26 @@ func (cfg *StoreConfig) Valid() error { return nil } -func (cfg *Config) Valid() error { +func (cfg *RedisConfig) Valid() error { + if len(cfg.Addrs) == 0 { + return dferrors.Newf(dfcodes.ManagerConfigError, "redis config error: Addrs is null") + } + + return nil +} + +func (cfg *Config) Valid() error { + if cfg.Redis == nil { + return dferrors.Newf(dfcodes.ManagerConfigError, "redis config error: Redis is null") + } + + if err := cfg.Redis.Valid(); err != nil { + return err + } + + if len(cfg.Stores) <= 0 { + return dferrors.Newf(dfcodes.ManagerConfigError, "stores config error: Stores is null") + } + return nil } diff --git a/manager/configsvc/service.go b/manager/configsvc/service.go index ae5831ac0..b635f41b7 100644 --- a/manager/configsvc/service.go +++ b/manager/configsvc/service.go @@ -7,13 +7,18 @@ import ( "time" "d7y.io/dragonfly/v2/manager/apis/v2/types" + "d7y.io/dragonfly/v2/manager/dc" "d7y.io/dragonfly/v2/manager/host" "d7y.io/dragonfly/v2/manager/hostidentifier" + "d7y.io/dragonfly/v2/manager/lease" "d7y.io/dragonfly/v2/manager/store" + "d7y.io/dragonfly/v2/pkg/cache" "d7y.io/dragonfly/v2/pkg/dfcodes" "d7y.io/dragonfly/v2/pkg/dferrors" logger "d7y.io/dragonfly/v2/pkg/dflog" "d7y.io/dragonfly/v2/pkg/rpc/manager" + "d7y.io/dragonfly/v2/pkg/util/net/iputils" + rCache "github.com/go-redis/cache/v8" ) var KeepAliveTimeoutMax = 60 * time.Second @@ -28,52 +33,148 @@ const ( CDNInstancePrefix string = "cins-" ) -type schedulerInstance struct { - instance *types.SchedulerInstance - keepAliveTime time.Time -} - -type cdnInstance struct { - instance *types.CDNInstance - keepAliveTime time.Time -} - type ConfigSvc struct { mu sync.Mutex store store.Store identifier hostidentifier.Identifier + lessor lease.Lessor - schClusters map[string]*types.SchedulerCluster - cdnClusters map[string]*types.CDNCluster - schInstances map[string]*schedulerInstance - cdnInstances map[string]*cdnInstance - securityDomain map[string]*types.SecurityDomain + schedulerClusters cache.Cache + cdnClusters cache.Cache + schedulerInstances cache.Cache + cdnInstances cache.Cache + securityDomain cache.Cache + + keepAliveCache *rCache.Cache + keepAliveTTL time.Duration + keepAliveTimeoutMax time.Duration + checkKeepAliveTime time.Duration + + keepAliveLeaseID lease.LeaseID + grantKeepAliveLeaseIDTime time.Duration stopC chan struct{} wg sync.WaitGroup } -func NewConfigSvc(store store.Store, identifier hostidentifier.Identifier) (*ConfigSvc, error) { +func NewConfigSvc(store store.Store, identifier hostidentifier.Identifier, lessor lease.Lessor, client *dc.RedisClient) (*ConfigSvc, error) { svc := &ConfigSvc{ - store: store, - identifier: identifier, - schClusters: make(map[string]*types.SchedulerCluster), - cdnClusters: make(map[string]*types.CDNCluster), - schInstances: make(map[string]*schedulerInstance), - cdnInstances: make(map[string]*cdnInstance), - securityDomain: make(map[string]*types.SecurityDomain), - stopC: make(chan struct{}), + store: store, + identifier: identifier, + lessor: lessor, + schedulerClusters: cache.New(5*time.Minute, 5*time.Second), + cdnClusters: cache.New(5*time.Minute, 5*time.Second), + schedulerInstances: cache.New(5*time.Minute, 5*time.Second), + cdnInstances: cache.New(5*time.Minute, 5*time.Second), + securityDomain: cache.New(5*time.Minute, 5*time.Second), + stopC: make(chan struct{}), } + svc.schedulerClusters.OnEvicted(svc.schClusterOnEvicted) + svc.cdnClusters.OnEvicted(svc.cdnClusterOnEvicted) + svc.schedulerInstances.OnEvicted(svc.schInstanceOnEvicted) + svc.cdnInstances.OnEvicted(svc.cdnInstanceOnEvicted) + svc.securityDomain.OnEvicted(svc.securityDomainOnEvicted) + + /* keepAlive */ + if client.Client != nil { + svc.keepAliveCache = rCache.New(&rCache.Options{ + Redis: client.Client, + }) + } else { + svc.keepAliveCache = rCache.New(&rCache.Options{ + Redis: client.ClusterClient, + }) + } + svc.keepAliveTTL = 5 * time.Minute + svc.keepAliveTimeoutMax = KeepAliveTimeoutMax + svc.checkKeepAliveTime = KeepAliveTimeoutMax / 5 + + /* keepAliveLease */ + svc.keepAliveLeaseID = lease.NoLease + svc.grantKeepAliveLeaseIDTime = 5 * time.Second + if err := svc.rebuild(); err != nil { return nil, err } - svc.wg.Add(1) + svc.wg.Add(2) + go svc.grantKeepAliveLeaseLoop() go svc.checkKeepAliveLoop() return svc, nil } +func (svc *ConfigSvc) schClusterOnEvicted(s string, i interface{}) { + _, _ = svc.GetSchedulerCluster(context.TODO(), s, store.WithSkipLocalCache()) +} + +func (svc *ConfigSvc) schInstanceOnEvicted(s string, i interface{}) { + _, _ = svc.GetSchedulerInstance(context.TODO(), s, store.WithSkipLocalCache()) +} + +func (svc *ConfigSvc) cdnClusterOnEvicted(s string, i interface{}) { + _, _ = svc.GetCDNCluster(context.TODO(), s, store.WithSkipLocalCache()) +} + +func (svc *ConfigSvc) cdnInstanceOnEvicted(s string, i interface{}) { + _, _ = svc.GetCDNInstance(context.TODO(), s, store.WithSkipLocalCache()) +} + +func (svc *ConfigSvc) securityDomainOnEvicted(s string, i interface{}) { + _, _ = svc.GetSecurityDomain(context.TODO(), s, store.WithSkipLocalCache()) +} + +func (svc *ConfigSvc) grantKeepAliveLease() (lease.LeaseID, chan struct{}, error) { + ctx, cancel := context.WithTimeout(context.TODO(), 5*time.Second) + defer cancel() + + key := "ConfigSvc/keepAliveLeaseID" + value := iputils.HostName + leaseID, err := svc.lessor.Grant(ctx, key, value, 10) + if err != nil { + logger.Infof("grant lease error: key %s, value %s, error %s", key, value, err.Error()) + return lease.NoLease, nil, err + } + + ch, err := svc.lessor.KeepAlive(ctx, leaseID) + if err != nil { + svc.lessor.Revoke(ctx, leaseID) + logger.Errorf("keepalive lease error: key %s, value %s, leaseID %s, error %s", key, value, leaseID.String(), err.Error()) + return lease.NoLease, nil, err + } + + logger.Infof("grant lease successful: key %s, value %s, leaseID %s", key, value, leaseID.String()) + return leaseID, ch, nil +} + +func (svc *ConfigSvc) grantKeepAliveLeaseLoop() { + defer svc.wg.Done() + + var ka chan struct{} + id, ch, err := svc.grantKeepAliveLease() + if err == nil { + svc.keepAliveLeaseID = id + ka = ch + } + + for { + select { + case <-svc.stopC: + return + case <-ka: + svc.keepAliveLeaseID = lease.NoLease + case <-time.After(svc.grantKeepAliveLeaseIDTime): + if svc.keepAliveLeaseID == lease.NoLease { + id, ch, err := svc.grantKeepAliveLease() + if err == nil { + svc.keepAliveLeaseID = id + ka = ch + } + } + } + } +} + func (svc *ConfigSvc) checkKeepAliveLoop() { defer svc.wg.Done() @@ -81,49 +182,82 @@ func (svc *ConfigSvc) checkKeepAliveLoop() { select { case <-svc.stopC: return - case <-time.After(KeepAliveTimeoutMax): - svc.updateAllInstanceState() + case <-time.After(svc.checkKeepAliveTime): + if svc.keepAliveLeaseID != lease.NoLease { + svc.updateAllInstanceState() + } } } } +func (svc *ConfigSvc) setKeepAlive(ctx context.Context, instanceID string) error { + ctx, cancel := context.WithTimeout(ctx, 5*time.Second) + defer cancel() + + return svc.keepAliveCache.Set(&rCache.Item{ + Ctx: ctx, + Key: instanceID, + Value: time.Now(), + TTL: svc.keepAliveTTL, + }) +} + +func (svc *ConfigSvc) getKeepAlive(ctx context.Context, instanceID string) (time.Time, error) { + ctx, cancel := context.WithTimeout(ctx, 5*time.Second) + defer cancel() + + var v time.Time + err := svc.keepAliveCache.Get(ctx, instanceID, &v) + if err != nil { + return time.Now(), err + } + + return v, err +} + +func (svc *ConfigSvc) deleteKeepAlive(ctx context.Context, instanceID string) error { + ctx, cancel := context.WithTimeout(ctx, 5*time.Second) + defer cancel() + + return svc.keepAliveCache.Delete(ctx, instanceID) +} + func (svc *ConfigSvc) updateAllInstanceState() { - var schInstances []types.SchedulerInstance - var cdnInstances []types.CDNInstance - - svc.mu.Lock() now := time.Now() - for _, instance := range svc.schInstances { - if now.Before(instance.keepAliveTime.Add(KeepAliveTimeoutMax)) { + for instanceID, item := range svc.schedulerInstances.Items() { + keepAliveTime, err := svc.getKeepAlive(context.TODO(), instanceID) + if err != nil { continue } - if state, ok := instanceNextState(instance.instance.State, true); ok { - inter := *instance.instance - inter.State = state - schInstances = append(schInstances, inter) - } - } - - for _, instance := range svc.cdnInstances { - if now.Before(instance.keepAliveTime.Add(KeepAliveTimeoutMax)) { + if now.Before(keepAliveTime.Add(svc.keepAliveTimeoutMax)) { continue } - if state, ok := instanceNextState(instance.instance.State, true); ok { - inter := *instance.instance + instance := item.Object.(*types.SchedulerInstance) + if state, ok := instanceNextState(instance.State, true); ok { + inter := *instance inter.State = state - cdnInstances = append(cdnInstances, inter) + _, _ = svc.UpdateSchedulerInstance(context.TODO(), &inter, store.WithKeepalive()) } } - svc.mu.Unlock() - for _, instance := range schInstances { - svc.UpdateSchedulerInstance(context.TODO(), &instance) - } + for instanceID, item := range svc.cdnInstances.Items() { + keepAliveTime, err := svc.getKeepAlive(context.TODO(), instanceID) + if err != nil { + continue + } - for _, instance := range cdnInstances { - svc.UpdateCDNInstance(context.TODO(), &instance) + if now.Before(keepAliveTime.Add(svc.keepAliveTimeoutMax)) { + continue + } + + instance := item.Object.(*types.CDNInstance) + if state, ok := instanceNextState(instance.State, true); ok { + inter := *instance + inter.State = state + _, _ = svc.UpdateCDNInstance(context.TODO(), &inter, store.WithKeepalive()) + } } } @@ -137,7 +271,7 @@ func (svc *ConfigSvc) rebuild() error { break } else { for _, cluster := range clusters { - svc.schClusters[cluster.ClusterID] = cluster + _ = svc.schedulerClusters.Add(cluster.ClusterID, cluster, cache.DefaultExpiration) } } } @@ -149,7 +283,7 @@ func (svc *ConfigSvc) rebuild() error { break } else { for _, cluster := range clusters { - svc.cdnClusters[cluster.ClusterID] = cluster + _ = svc.cdnClusters.Add(cluster.ClusterID, cluster, cache.DefaultExpiration) } } } @@ -161,10 +295,7 @@ func (svc *ConfigSvc) rebuild() error { break } else { for _, instance := range instances { - svc.schInstances[instance.InstanceID] = &schedulerInstance{ - instance: instance, - keepAliveTime: time.Now(), - } + _ = svc.schedulerInstances.Add(instance.InstanceID, instance, cache.DefaultExpiration) svc.identifier.Put(SchedulerInstancePrefix+instance.HostName, instance.InstanceID) } } @@ -177,10 +308,7 @@ func (svc *ConfigSvc) rebuild() error { break } else { for _, instance := range instances { - svc.cdnInstances[instance.InstanceID] = &cdnInstance{ - instance: instance, - keepAliveTime: time.Now(), - } + _ = svc.cdnInstances.Add(instance.InstanceID, instance, cache.DefaultExpiration) svc.identifier.Put(CDNInstancePrefix+instance.HostName, instance.InstanceID) } } @@ -193,7 +321,7 @@ func (svc *ConfigSvc) rebuild() error { break } else { for _, domain := range domains { - svc.securityDomain[domain.SecurityDomain] = domain + _ = svc.securityDomain.Add(domain.SecurityDomain, domain, cache.DefaultExpiration) } } } @@ -201,32 +329,33 @@ func (svc *ConfigSvc) rebuild() error { return nil } -func (svc *ConfigSvc) Stop() { - svc.stopC <- struct{}{} +func (svc *ConfigSvc) Close() error { + close(svc.stopC) svc.wg.Wait() + svc.stopC = nil + return nil } func (svc *ConfigSvc) GetSchedulers(ctx context.Context, hostInfo *host.Info) ([]string, error) { - nodes := []string{} - svc.mu.Lock() - defer svc.mu.Unlock() + var nodes []string - for _, instance := range svc.schInstances { - if instance.instance.State != InstanceActive { + for _, item := range svc.schedulerInstances.Items() { + instance := item.Object.(*types.SchedulerInstance) + if instance.State != InstanceActive { continue } if hostInfo.IsDefault() { - nodes = append(nodes, fmt.Sprintf("%s:%d", instance.instance.IP, instance.instance.Port)) + nodes = append(nodes, fmt.Sprintf("%s:%d", instance.IP, instance.Port)) continue } - if instance.instance.SecurityDomain != hostInfo.SecurityDomain { + if instance.SecurityDomain != hostInfo.SecurityDomain { continue } - if instance.instance.IDC == hostInfo.IDC { - nodes = append(nodes, fmt.Sprintf("%s:%d", instance.instance.IP, instance.instance.Port)) + if instance.IDC == hostInfo.IDC { + nodes = append(nodes, fmt.Sprintf("%s:%d", instance.IP, instance.Port)) continue } } @@ -257,16 +386,19 @@ func (svc *ConfigSvc) KeepAlive(ctx context.Context, req *manager.KeepAliveReque return dferrors.Newf(dfcodes.ManagerError, "hostname not exist, %s", req.GetHostName()) } - instance, exist := svc.getSchedulerInstance(ctx, instanceID) - if !exist { - return dferrors.Newf(dfcodes.ManagerError, "Scheduler instance not exist, instanceID %s", instanceID) + if err := svc.setKeepAlive(ctx, instanceID); err != nil { + return err } - instance.keepAliveTime = time.Now() - if state, ok := instanceNextState(instance.instance.State, false); ok { - inter := *instance.instance + instance, err := svc.GetSchedulerInstance(ctx, instanceID) + if err != nil { + return dferrors.Newf(dfcodes.ManagerError, "Scheduler instance not exist, instanceID %s, error %s", instanceID, err.Error()) + } + + if state, ok := instanceNextState(instance.State, false); ok { + inter := *instance inter.State = state - _, err := svc.UpdateSchedulerInstance(ctx, &inter) + _, err := svc.UpdateSchedulerInstance(ctx, &inter, store.WithKeepalive()) return err } @@ -277,16 +409,19 @@ func (svc *ConfigSvc) KeepAlive(ctx context.Context, req *manager.KeepAliveReque return dferrors.Newf(dfcodes.ManagerError, "hostname not exist, %s", req.GetHostName()) } - instance, exist := svc.getCDNInstance(ctx, instanceID) - if !exist { - return dferrors.Newf(dfcodes.ManagerError, "Cdn instance not exist, instanceID %s", instanceID) + if err := svc.setKeepAlive(ctx, instanceID); err != nil { + return err } - instance.keepAliveTime = time.Now() - if state, ok := instanceNextState(instance.instance.State, false); ok { - inter := *instance.instance + instance, err := svc.GetCDNInstance(ctx, instanceID) + if err != nil { + return dferrors.Newf(dfcodes.ManagerError, "Cdn instance not exist, instanceID %s, error %s", instanceID, err.Error()) + } + + if state, ok := instanceNextState(instance.State, false); ok { + inter := *instance inter.State = state - _, err := svc.UpdateCDNInstance(ctx, &inter) + _, err := svc.UpdateCDNInstance(ctx, &inter, store.WithKeepalive()) return err } @@ -336,78 +471,59 @@ func (svc *ConfigSvc) GetInstanceAndClusterConfig(ctx context.Context, req *mana } } -func (svc *ConfigSvc) AddSchedulerCluster(ctx context.Context, cluster *types.SchedulerCluster) (*types.SchedulerCluster, error) { +func (svc *ConfigSvc) AddSchedulerCluster(ctx context.Context, cluster *types.SchedulerCluster, opts ...store.OpOption) (*types.SchedulerCluster, error) { cluster.ClusterID = NewUUID(SchedulerClusterPrefix) - inter, err := svc.store.Add(ctx, cluster.ClusterID, cluster, store.WithResourceType(store.SchedulerCluster)) + inter, err := svc.store.Add(ctx, cluster.ClusterID, cluster, append(opts, store.WithResourceType(store.SchedulerCluster))...) if err != nil { return nil, err } cluster = inter.(*types.SchedulerCluster) - svc.mu.Lock() - defer svc.mu.Unlock() - svc.schClusters[cluster.ClusterID] = cluster + _ = svc.schedulerClusters.Add(cluster.ClusterID, cluster, cache.DefaultExpiration) return cluster, nil } -func (svc *ConfigSvc) DeleteSchedulerCluster(ctx context.Context, clusterID string) (*types.SchedulerCluster, error) { - if inter, err := svc.store.Delete(ctx, clusterID, store.WithResourceType(store.SchedulerCluster)); err != nil { +func (svc *ConfigSvc) DeleteSchedulerCluster(ctx context.Context, clusterID string, opts ...store.OpOption) (*types.SchedulerCluster, error) { + if inter, err := svc.store.Delete(ctx, clusterID, append(opts, store.WithResourceType(store.SchedulerCluster))...); err != nil { return nil, err } else if inter == nil { return nil, nil } else { cluster := inter.(*types.SchedulerCluster) - svc.mu.Lock() - defer svc.mu.Unlock() - if _, exist := svc.schClusters[cluster.ClusterID]; exist { - delete(svc.schClusters, cluster.ClusterID) - } + svc.schedulerClusters.Delete(cluster.ClusterID) return cluster, nil } } -func (svc *ConfigSvc) UpdateSchedulerCluster(ctx context.Context, cluster *types.SchedulerCluster) (*types.SchedulerCluster, error) { - inter, err := svc.store.Update(ctx, cluster.ClusterID, cluster, store.WithResourceType(store.SchedulerCluster)) +func (svc *ConfigSvc) UpdateSchedulerCluster(ctx context.Context, cluster *types.SchedulerCluster, opts ...store.OpOption) (*types.SchedulerCluster, error) { + inter, err := svc.store.Update(ctx, cluster.ClusterID, cluster, append(opts, store.WithResourceType(store.SchedulerCluster))...) if err != nil { return nil, err } cluster = inter.(*types.SchedulerCluster) - svc.mu.Lock() - defer svc.mu.Unlock() - if _, exist := svc.schClusters[cluster.ClusterID]; exist { - delete(svc.schClusters, cluster.ClusterID) - } - svc.schClusters[cluster.ClusterID] = cluster + svc.schedulerClusters.SetDefault(cluster.ClusterID, cluster) return cluster, nil } -func (svc *ConfigSvc) getSchedulerCluster(ctx context.Context, clusterID string) (*types.SchedulerCluster, bool) { - svc.mu.Lock() - defer svc.mu.Unlock() - if cur, exist := svc.schClusters[clusterID]; exist { - return cur, true - } +func (svc *ConfigSvc) GetSchedulerCluster(ctx context.Context, clusterID string, opts ...store.OpOption) (*types.SchedulerCluster, error) { + op := store.Op{} + op.ApplyOpts(opts) - return nil, false -} - -func (svc *ConfigSvc) GetSchedulerCluster(ctx context.Context, clusterID string) (*types.SchedulerCluster, error) { - if cluster, exist := svc.getSchedulerCluster(ctx, clusterID); exist { - return cluster, nil - } else if inter, err := svc.store.Get(ctx, clusterID, store.WithResourceType(store.SchedulerCluster)); err != nil { - return nil, err - } else { - cluster := inter.(*types.SchedulerCluster) - svc.mu.Lock() - defer svc.mu.Unlock() - if _, exist := svc.schClusters[cluster.ClusterID]; exist { - delete(svc.schClusters, cluster.ClusterID) + if !op.SkipLocalCache { + if cluster, _, exist := svc.schedulerClusters.GetWithExpiration(clusterID); exist { + return cluster.(*types.SchedulerCluster), nil } - - svc.schClusters[cluster.ClusterID] = cluster - return cluster, nil } + + inter, err := svc.store.Get(ctx, clusterID, append(opts, store.WithResourceType(store.SchedulerCluster))...) + if err != nil { + return nil, err + } + + cluster := inter.(*types.SchedulerCluster) + svc.schedulerClusters.SetDefault(cluster.ClusterID, cluster) + return cluster, nil } func (svc *ConfigSvc) ListSchedulerClusters(ctx context.Context, opts ...store.OpOption) ([]*types.SchedulerCluster, error) { @@ -424,103 +540,64 @@ func (svc *ConfigSvc) ListSchedulerClusters(ctx context.Context, opts ...store.O return clusters, nil } -func (svc *ConfigSvc) AddSchedulerInstance(ctx context.Context, instance *types.SchedulerInstance) (*types.SchedulerInstance, error) { +func (svc *ConfigSvc) AddSchedulerInstance(ctx context.Context, instance *types.SchedulerInstance, opts ...store.OpOption) (*types.SchedulerInstance, error) { instance.InstanceID = NewUUID(SchedulerInstancePrefix) instance.State = InstanceInactive - inter, err := svc.store.Add(ctx, instance.InstanceID, instance, store.WithResourceType(store.SchedulerInstance)) + inter, err := svc.store.Add(ctx, instance.InstanceID, instance, append(opts, store.WithResourceType(store.SchedulerInstance))...) if err != nil { return nil, err } instance = inter.(*types.SchedulerInstance) - svc.mu.Lock() - defer svc.mu.Unlock() - svc.schInstances[instance.InstanceID] = &schedulerInstance{ - instance: instance, - keepAliveTime: time.Now(), - } - + _ = svc.schedulerInstances.Add(instance.InstanceID, instance, cache.DefaultExpiration) svc.identifier.Put(SchedulerInstancePrefix+instance.HostName, instance.InstanceID) + _ = svc.setKeepAlive(ctx, instance.InstanceID) return instance, nil } -func (svc *ConfigSvc) DeleteSchedulerInstance(ctx context.Context, instanceID string) (*types.SchedulerInstance, error) { - if inter, err := svc.store.Delete(ctx, instanceID, store.WithResourceType(store.SchedulerInstance)); err != nil { +func (svc *ConfigSvc) DeleteSchedulerInstance(ctx context.Context, instanceID string, opts ...store.OpOption) (*types.SchedulerInstance, error) { + if inter, err := svc.store.Delete(ctx, instanceID, append(opts, store.WithResourceType(store.SchedulerInstance))...); err != nil { return nil, err } else if inter == nil { return nil, nil } else { instance := inter.(*types.SchedulerInstance) - svc.mu.Lock() - defer svc.mu.Unlock() - if _, exist := svc.schInstances[instance.InstanceID]; exist { - delete(svc.schInstances, instance.InstanceID) - } + svc.schedulerInstances.Delete(instance.InstanceID) svc.identifier.Delete(SchedulerInstancePrefix + instance.HostName) + _ = svc.deleteKeepAlive(ctx, instance.InstanceID) return instance, nil } } -func (svc *ConfigSvc) UpdateSchedulerInstance(ctx context.Context, instance *types.SchedulerInstance) (*types.SchedulerInstance, error) { - inter, err := svc.store.Update(ctx, instance.InstanceID, instance, store.WithResourceType(store.SchedulerInstance)) +func (svc *ConfigSvc) UpdateSchedulerInstance(ctx context.Context, instance *types.SchedulerInstance, opts ...store.OpOption) (*types.SchedulerInstance, error) { + inter, err := svc.store.Update(ctx, instance.InstanceID, instance, append(opts, store.WithResourceType(store.SchedulerInstance))...) if err != nil { return nil, err } instance = inter.(*types.SchedulerInstance) - svc.mu.Lock() - defer svc.mu.Unlock() - - var keepAliveTime time.Time - if old, exist := svc.schInstances[instance.InstanceID]; exist { - keepAliveTime = old.keepAliveTime - delete(svc.schInstances, instance.InstanceID) - } else { - keepAliveTime = time.Now() - } - - svc.schInstances[instance.InstanceID] = &schedulerInstance{ - instance: instance, - keepAliveTime: keepAliveTime, - } - + svc.schedulerInstances.SetDefault(instance.InstanceID, instance) return instance, nil } -func (svc *ConfigSvc) getSchedulerInstance(ctx context.Context, instanceID string) (*schedulerInstance, bool) { - svc.mu.Lock() - defer svc.mu.Unlock() - if cur, exist := svc.schInstances[instanceID]; exist { - return cur, true +func (svc *ConfigSvc) GetSchedulerInstance(ctx context.Context, instanceID string, opts ...store.OpOption) (*types.SchedulerInstance, error) { + op := store.Op{} + op.ApplyOpts(opts) + + if !op.SkipLocalCache { + if instance, _, exist := svc.schedulerInstances.GetWithExpiration(instanceID); exist { + return instance.(*types.SchedulerInstance), nil + } } - return nil, false -} - -func (svc *ConfigSvc) GetSchedulerInstance(ctx context.Context, instanceID string) (*types.SchedulerInstance, error) { - if instance, exist := svc.getSchedulerInstance(ctx, instanceID); exist { - return instance.instance, nil - } else if inter, err := svc.store.Get(ctx, instanceID, store.WithResourceType(store.SchedulerInstance)); err != nil { + inter, err := svc.store.Get(ctx, instanceID, append(opts, store.WithResourceType(store.SchedulerInstance))...) + if err != nil { return nil, err - } else { - instance := inter.(*types.SchedulerInstance) - svc.mu.Lock() - defer svc.mu.Unlock() - - var keepAliveTime time.Time - if old, exist := svc.schInstances[instance.InstanceID]; exist { - keepAliveTime = old.keepAliveTime - delete(svc.schInstances, instance.InstanceID) - } else { - keepAliveTime = time.Now() - } - - svc.schInstances[instance.InstanceID] = &schedulerInstance{ - instance: instance, - keepAliveTime: keepAliveTime, - } - return instance, nil } + + instance := inter.(*types.SchedulerInstance) + svc.schedulerInstances.SetDefault(instance.InstanceID, instance) + return instance, nil } func (svc *ConfigSvc) ListSchedulerInstances(ctx context.Context, opts ...store.OpOption) ([]*types.SchedulerInstance, error) { @@ -537,81 +614,59 @@ func (svc *ConfigSvc) ListSchedulerInstances(ctx context.Context, opts ...store. return instances, nil } -func (svc *ConfigSvc) AddCDNCluster(ctx context.Context, cluster *types.CDNCluster) (*types.CDNCluster, error) { +func (svc *ConfigSvc) AddCDNCluster(ctx context.Context, cluster *types.CDNCluster, opts ...store.OpOption) (*types.CDNCluster, error) { cluster.ClusterID = NewUUID(CDNClusterPrefix) - inter, err := svc.store.Add(ctx, cluster.ClusterID, cluster, store.WithResourceType(store.CDNCluster)) + inter, err := svc.store.Add(ctx, cluster.ClusterID, cluster, append(opts, store.WithResourceType(store.CDNCluster))...) if err != nil { return nil, err } cluster = inter.(*types.CDNCluster) - svc.mu.Lock() - defer svc.mu.Unlock() - if _, exist := svc.cdnClusters[cluster.ClusterID]; exist { - delete(svc.cdnClusters, cluster.ClusterID) - } - svc.cdnClusters[cluster.ClusterID] = cluster + _ = svc.cdnClusters.Add(cluster.ClusterID, cluster, cache.DefaultExpiration) return cluster, nil } -func (svc *ConfigSvc) DeleteCDNCluster(ctx context.Context, clusterID string) (*types.CDNCluster, error) { - if inter, err := svc.store.Delete(ctx, clusterID, store.WithResourceType(store.CDNCluster)); err != nil { +func (svc *ConfigSvc) DeleteCDNCluster(ctx context.Context, clusterID string, opts ...store.OpOption) (*types.CDNCluster, error) { + if inter, err := svc.store.Delete(ctx, clusterID, append(opts, store.WithResourceType(store.CDNCluster))...); err != nil { return nil, err } else if inter == nil { return nil, nil } else { cluster := inter.(*types.CDNCluster) - svc.mu.Lock() - defer svc.mu.Unlock() - if _, exist := svc.cdnClusters[cluster.ClusterID]; exist { - delete(svc.cdnClusters, cluster.ClusterID) - } + svc.cdnClusters.Delete(cluster.ClusterID) return cluster, nil } } -func (svc *ConfigSvc) UpdateCDNCluster(ctx context.Context, cluster *types.CDNCluster) (*types.CDNCluster, error) { - inter, err := svc.store.Update(ctx, cluster.ClusterID, cluster, store.WithResourceType(store.CDNCluster)) +func (svc *ConfigSvc) UpdateCDNCluster(ctx context.Context, cluster *types.CDNCluster, opts ...store.OpOption) (*types.CDNCluster, error) { + inter, err := svc.store.Update(ctx, cluster.ClusterID, cluster, append(opts, store.WithResourceType(store.CDNCluster))...) if err != nil { return nil, err } cluster = inter.(*types.CDNCluster) - svc.mu.Lock() - defer svc.mu.Unlock() - if _, exist := svc.cdnClusters[cluster.ClusterID]; exist { - delete(svc.cdnClusters, cluster.ClusterID) - } - svc.cdnClusters[cluster.ClusterID] = cluster + svc.cdnClusters.SetDefault(cluster.ClusterID, cluster) return cluster, nil } -func (svc *ConfigSvc) getCDNCluster(ctx context.Context, clusterID string) (*types.CDNCluster, bool) { - svc.mu.Lock() - defer svc.mu.Unlock() - if cur, exist := svc.cdnClusters[clusterID]; exist { - return cur, true - } +func (svc *ConfigSvc) GetCDNCluster(ctx context.Context, clusterID string, opts ...store.OpOption) (*types.CDNCluster, error) { + op := store.Op{} + op.ApplyOpts(opts) - return nil, false -} - -func (svc *ConfigSvc) GetCDNCluster(ctx context.Context, clusterID string) (*types.CDNCluster, error) { - if cluster, exist := svc.getCDNCluster(ctx, clusterID); exist { - return cluster, nil - } else if inter, err := svc.store.Get(ctx, clusterID, store.WithResourceType(store.CDNCluster)); err != nil { - return nil, err - } else { - cluster := inter.(*types.CDNCluster) - svc.mu.Lock() - defer svc.mu.Unlock() - if _, exist := svc.cdnClusters[cluster.ClusterID]; exist { - delete(svc.cdnClusters, cluster.ClusterID) + if !op.SkipLocalCache { + if cluster, _, exist := svc.cdnClusters.GetWithExpiration(clusterID); exist { + return cluster.(*types.CDNCluster), nil } - - svc.cdnClusters[cluster.ClusterID] = cluster - return cluster, nil } + + inter, err := svc.store.Get(ctx, clusterID, append(opts, store.WithResourceType(store.CDNCluster))...) + if err != nil { + return nil, err + } + + cluster := inter.(*types.CDNCluster) + svc.cdnClusters.SetDefault(cluster.ClusterID, cluster) + return cluster, nil } func (svc *ConfigSvc) ListCDNClusters(ctx context.Context, opts ...store.OpOption) ([]*types.CDNCluster, error) { @@ -628,104 +683,64 @@ func (svc *ConfigSvc) ListCDNClusters(ctx context.Context, opts ...store.OpOptio return clusters, nil } -func (svc *ConfigSvc) AddCDNInstance(ctx context.Context, instance *types.CDNInstance) (*types.CDNInstance, error) { +func (svc *ConfigSvc) AddCDNInstance(ctx context.Context, instance *types.CDNInstance, opts ...store.OpOption) (*types.CDNInstance, error) { instance.InstanceID = NewUUID(CDNInstancePrefix) instance.State = InstanceInactive - inter, err := svc.store.Add(ctx, instance.InstanceID, instance, store.WithResourceType(store.CDNInstance)) + inter, err := svc.store.Add(ctx, instance.InstanceID, instance, append(opts, store.WithResourceType(store.CDNInstance))...) if err != nil { return nil, err } instance = inter.(*types.CDNInstance) - svc.mu.Lock() - defer svc.mu.Unlock() - if _, exist := svc.cdnInstances[instance.InstanceID]; exist { - delete(svc.cdnInstances, instance.InstanceID) - } - svc.cdnInstances[instance.InstanceID] = &cdnInstance{ - instance: instance, - keepAliveTime: time.Now(), - } + _ = svc.cdnInstances.Add(instance.InstanceID, instance, cache.DefaultExpiration) svc.identifier.Put(CDNInstancePrefix+instance.HostName, instance.InstanceID) + _ = svc.setKeepAlive(ctx, instance.InstanceID) return instance, nil } -func (svc *ConfigSvc) DeleteCDNInstance(ctx context.Context, instanceID string) (*types.CDNInstance, error) { - if inter, err := svc.store.Delete(ctx, instanceID, store.WithResourceType(store.CDNInstance)); err != nil { +func (svc *ConfigSvc) DeleteCDNInstance(ctx context.Context, instanceID string, opts ...store.OpOption) (*types.CDNInstance, error) { + if inter, err := svc.store.Delete(ctx, instanceID, append(opts, store.WithResourceType(store.CDNInstance))...); err != nil { return nil, err } else if inter == nil { return nil, nil } else { instance := inter.(*types.CDNInstance) - svc.mu.Lock() - defer svc.mu.Unlock() - if _, exist := svc.cdnInstances[instance.InstanceID]; exist { - delete(svc.cdnInstances, instance.InstanceID) - } + svc.cdnInstances.Delete(instance.InstanceID) svc.identifier.Delete(CDNInstancePrefix + instance.HostName) + _ = svc.deleteKeepAlive(ctx, instance.InstanceID) return instance, nil } } -func (svc *ConfigSvc) UpdateCDNInstance(ctx context.Context, instance *types.CDNInstance) (*types.CDNInstance, error) { - inter, err := svc.store.Update(ctx, instance.InstanceID, instance, store.WithResourceType(store.CDNInstance)) +func (svc *ConfigSvc) UpdateCDNInstance(ctx context.Context, instance *types.CDNInstance, opts ...store.OpOption) (*types.CDNInstance, error) { + inter, err := svc.store.Update(ctx, instance.InstanceID, instance, append(opts, store.WithResourceType(store.CDNInstance))...) if err != nil { return nil, err } instance = inter.(*types.CDNInstance) - svc.mu.Lock() - defer svc.mu.Unlock() - - var keepAliveTime time.Time - if old, exist := svc.cdnInstances[instance.InstanceID]; exist { - keepAliveTime = old.keepAliveTime - delete(svc.cdnInstances, instance.InstanceID) - } else { - keepAliveTime = time.Now() - } - - svc.cdnInstances[instance.InstanceID] = &cdnInstance{ - instance: instance, - keepAliveTime: keepAliveTime, - } + svc.cdnInstances.SetDefault(instance.InstanceID, instance) return instance, nil } -func (svc *ConfigSvc) getCDNInstance(ctx context.Context, instanceID string) (*cdnInstance, bool) { - svc.mu.Lock() - defer svc.mu.Unlock() - if cur, exist := svc.cdnInstances[instanceID]; exist { - return cur, true +func (svc *ConfigSvc) GetCDNInstance(ctx context.Context, instanceID string, opts ...store.OpOption) (*types.CDNInstance, error) { + op := store.Op{} + op.ApplyOpts(opts) + + if !op.SkipLocalCache { + if instance, _, exist := svc.cdnInstances.GetWithExpiration(instanceID); exist { + return instance.(*types.CDNInstance), nil + } } - return nil, false -} - -func (svc *ConfigSvc) GetCDNInstance(ctx context.Context, instanceID string) (*types.CDNInstance, error) { - if instance, exist := svc.getCDNInstance(ctx, instanceID); exist { - return instance.instance, nil - } else if inter, err := svc.store.Get(ctx, instanceID, store.WithResourceType(store.CDNInstance)); err != nil { + inter, err := svc.store.Get(ctx, instanceID, append(opts, store.WithResourceType(store.CDNInstance))...) + if err != nil { return nil, err - } else { - instance := inter.(*types.CDNInstance) - svc.mu.Lock() - defer svc.mu.Unlock() - - var keepAliveTime time.Time - if old, exist := svc.cdnInstances[instance.InstanceID]; exist { - keepAliveTime = old.keepAliveTime - delete(svc.cdnInstances, instance.InstanceID) - } else { - keepAliveTime = time.Now() - } - - svc.cdnInstances[instance.InstanceID] = &cdnInstance{ - instance: instance, - keepAliveTime: keepAliveTime, - } - return instance, nil } + + instance := inter.(*types.CDNInstance) + svc.cdnInstances.SetDefault(instance.InstanceID, instance) + return instance, nil } func (svc *ConfigSvc) ListCDNInstances(ctx context.Context, opts ...store.OpOption) ([]*types.CDNInstance, error) { @@ -742,75 +757,58 @@ func (svc *ConfigSvc) ListCDNInstances(ctx context.Context, opts ...store.OpOpti return instances, nil } -func (svc *ConfigSvc) AddSecurityDomain(ctx context.Context, securityDomain *types.SecurityDomain) (*types.SecurityDomain, error) { - inter, err := svc.store.Add(ctx, securityDomain.SecurityDomain, securityDomain, store.WithResourceType(store.SecurityDomain)) +func (svc *ConfigSvc) AddSecurityDomain(ctx context.Context, securityDomain *types.SecurityDomain, opts ...store.OpOption) (*types.SecurityDomain, error) { + inter, err := svc.store.Add(ctx, securityDomain.SecurityDomain, securityDomain, append(opts, store.WithResourceType(store.SecurityDomain))...) if err != nil { return nil, err } domain := inter.(*types.SecurityDomain) - svc.mu.Lock() - defer svc.mu.Unlock() - svc.securityDomain[domain.SecurityDomain] = domain + _ = svc.securityDomain.Add(domain.SecurityDomain, domain, cache.DefaultExpiration) return domain, nil } -func (svc *ConfigSvc) DeleteSecurityDomain(ctx context.Context, securityDomain string) (*types.SecurityDomain, error) { - if inter, err := svc.store.Delete(ctx, securityDomain, store.WithResourceType(store.SecurityDomain)); err != nil { +func (svc *ConfigSvc) DeleteSecurityDomain(ctx context.Context, securityDomain string, opts ...store.OpOption) (*types.SecurityDomain, error) { + if inter, err := svc.store.Delete(ctx, securityDomain, append(opts, store.WithResourceType(store.SecurityDomain))...); err != nil { return nil, err } else if inter == nil { return nil, nil } else { domain := inter.(*types.SecurityDomain) - svc.mu.Lock() - defer svc.mu.Unlock() - if _, exist := svc.securityDomain[domain.SecurityDomain]; exist { - delete(svc.securityDomain, domain.SecurityDomain) - } + svc.securityDomain.Delete(domain.SecurityDomain) return domain, nil } } -func (svc *ConfigSvc) UpdateSecurityDomain(ctx context.Context, securityDomain *types.SecurityDomain) (*types.SecurityDomain, error) { - inter, err := svc.store.Update(ctx, securityDomain.SecurityDomain, securityDomain, store.WithResourceType(store.SecurityDomain)) +func (svc *ConfigSvc) UpdateSecurityDomain(ctx context.Context, securityDomain *types.SecurityDomain, opts ...store.OpOption) (*types.SecurityDomain, error) { + inter, err := svc.store.Update(ctx, securityDomain.SecurityDomain, securityDomain, append(opts, store.WithResourceType(store.SecurityDomain))...) if err != nil { return nil, err } domain := inter.(*types.SecurityDomain) - svc.mu.Lock() - defer svc.mu.Unlock() - if _, exist := svc.securityDomain[domain.SecurityDomain]; exist { - delete(svc.securityDomain, domain.SecurityDomain) - } - svc.securityDomain[domain.SecurityDomain] = domain + svc.securityDomain.SetDefault(domain.SecurityDomain, domain) return domain, nil } -func (svc *ConfigSvc) getSecurityDomain(ctx context.Context, securityDomain string) (*types.SecurityDomain, bool) { - svc.mu.Lock() - defer svc.mu.Unlock() - if cur, exist := svc.securityDomain[securityDomain]; exist { - return cur, true - } - return nil, false -} +func (svc *ConfigSvc) GetSecurityDomain(ctx context.Context, securityDomain string, opts ...store.OpOption) (*types.SecurityDomain, error) { + op := store.Op{} + op.ApplyOpts(opts) -func (svc *ConfigSvc) GetSecurityDomain(ctx context.Context, securityDomain string) (*types.SecurityDomain, error) { - if domain, exist := svc.getSecurityDomain(ctx, securityDomain); exist { - return domain, nil - } else if inter, err := svc.store.Get(ctx, securityDomain, store.WithResourceType(store.SecurityDomain)); err != nil { - return nil, err - } else { - domain := inter.(*types.SecurityDomain) - svc.mu.Lock() - defer svc.mu.Unlock() - if _, exist := svc.securityDomain[domain.SecurityDomain]; exist { - delete(svc.securityDomain, domain.SecurityDomain) + if !op.SkipLocalCache { + if domain, _, exist := svc.securityDomain.GetWithExpiration(securityDomain); exist { + return domain.(*types.SecurityDomain), nil } - svc.securityDomain[domain.SecurityDomain] = domain - return domain, nil } + + inter, err := svc.store.Get(ctx, securityDomain, append(opts, store.WithResourceType(store.SecurityDomain))...) + if err != nil { + return nil, err + } + + domain := inter.(*types.SecurityDomain) + svc.securityDomain.SetDefault(domain.SecurityDomain, domain) + return domain, nil } func (svc *ConfigSvc) ListSecurityDomains(ctx context.Context, opts ...store.OpOption) ([]*types.SecurityDomain, error) { diff --git a/manager/dc/redis.go b/manager/dc/redis.go new file mode 100644 index 000000000..1fb05bb23 --- /dev/null +++ b/manager/dc/redis.go @@ -0,0 +1,71 @@ +package dc + +import ( + "context" + "fmt" + "time" + + "d7y.io/dragonfly/v2/manager/config" + "github.com/go-redis/redis/v8" +) + +type RedisClient struct { + Client *redis.Client + ClusterClient *redis.ClusterClient +} + +func NewRedisClient(cfg *config.RedisConfig) (*RedisClient, error) { + if err := cfg.Valid(); err != nil { + return nil, err + } + + client := &RedisClient{} + if len(cfg.Addrs) == 1 { + client.Client = redis.NewClient(&redis.Options{ + Username: cfg.User, + Password: cfg.Password, + Addr: cfg.Addrs[0], + }) + } else { + client.ClusterClient = redis.NewClusterClient(&redis.ClusterOptions{ + Username: cfg.User, + Password: cfg.Password, + Addrs: cfg.Addrs, + }) + } + + ctx, cancel := context.WithTimeout(context.TODO(), 5*time.Second) + defer cancel() + + err := client.process(ctx, redis.NewStringCmd(ctx, "ping")) + if err != nil { + client.Close() + return nil, err + } + + return client, nil +} + +func (client *RedisClient) process(ctx context.Context, cmd redis.Cmder) error { + if client.Client != nil { + return client.Client.Process(ctx, cmd) + } + + if client.ClusterClient != nil { + return client.ClusterClient.Process(ctx, cmd) + } + + return fmt.Errorf("client and clusterClient are both nil") +} + +func (client *RedisClient) Close() error { + if client.Client != nil { + return client.Client.Close() + } + + if client.ClusterClient != nil { + return client.ClusterClient.Close() + } + + return nil +} diff --git a/manager/lease/id.go b/manager/lease/id.go new file mode 100644 index 000000000..ad8f93222 --- /dev/null +++ b/manager/lease/id.go @@ -0,0 +1,23 @@ +package lease + +import ( + "math/rand" + "time" +) + +const charset = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789" + +var seededRand = rand.New( + rand.NewSource(time.Now().UnixNano())) + +func stringWithCharset(length int, charset string) string { + b := make([]byte, length) + for i := range b { + b[i] = charset[seededRand.Intn(len(charset))] + } + return string(b) +} + +func NewID() LeaseID { + return LeaseID(stringWithCharset(16, charset)) +} diff --git a/manager/lease/lease.go b/manager/lease/lease.go new file mode 100644 index 000000000..eab6152ae --- /dev/null +++ b/manager/lease/lease.go @@ -0,0 +1,227 @@ +package lease + +import ( + "context" + "sync" + "time" + + "d7y.io/dragonfly/v2/manager/store" + "d7y.io/dragonfly/v2/pkg/dfcodes" + "d7y.io/dragonfly/v2/pkg/dferrors" +) + +type LeaseID string //nolint + +func (id LeaseID) String() string { + return string(id) +} + +const ( + // defaultTTL is the assumed lease TTL used for the first keepalive + // deadline before the actual TTL is known to the client. + defaultTTL = 5 + // NoLease is a lease ID for the absence of a lease. + NoLease LeaseID = "" + + // retryConnWait is how long to wait before retrying request due to an error + retryConnWait = 500 * time.Millisecond +) + +type Lessor interface { + // Grant creates a new lease. + Grant(ctx context.Context, key, value string, ttl int64) (LeaseID, error) + + // Revoke revokes the given lease. + Revoke(ctx context.Context, id LeaseID) + + // KeepAlive attempts to keep the given lease alive forever. + KeepAlive(ctx context.Context, id LeaseID) (chan struct{}, error) + + // Close releases all resources Lease keeps. + Close() error +} + +type keepAlive struct { + ch chan struct{} + // deadline is the time the keep alive channels close if no response + deadline time.Time + // nextKeepAlive is when to send the next keep alive message + nextKeepAlive time.Time +} + +type lessor struct { + mu sync.Mutex + stopC chan struct{} + keepAliveMap map[LeaseID]*keepAlive + store store.Store + wg sync.WaitGroup +} + +type Lease struct { + LeaseID LeaseID + Key string + Value string + TTL int64 +} + +func NewLessor(store store.Store) (Lessor, error) { + lessor := &lessor{ + mu: sync.Mutex{}, + stopC: make(chan struct{}), + keepAliveMap: make(map[LeaseID]*keepAlive), + store: store, + } + + lessor.wg.Add(2) + go lessor.deadlineLoop() + go lessor.keepAliveLoop() + return lessor, nil +} + +func (l *lessor) Grant(ctx context.Context, key, value string, ttl int64) (LeaseID, error) { + if ttl == 0 { + ttl = defaultTTL + } + + id := NewID() + lease := &Lease{ + LeaseID: id, + Key: key, + Value: value, + TTL: ttl, + } + + inter, err := l.store.Add(ctx, string(lease.LeaseID), lease, store.WithResourceType(store.Lease)) + if err != nil { + return NoLease, err + } + + id = inter.(*Lease).LeaseID + return id, nil +} + +func (l *lessor) Revoke(ctx context.Context, id LeaseID) { + _, _ = l.store.Delete(ctx, string(id), store.WithResourceType(store.Lease)) +} + +func (l *lessor) keepAlive(ctx context.Context, id LeaseID) *keepAlive { + l.mu.Lock() + defer l.mu.Unlock() + + ka, ok := l.keepAliveMap[id] + if !ok { + return nil + } + + return ka +} + +func (l *lessor) KeepAlive(ctx context.Context, id LeaseID) (chan struct{}, error) { + ka := l.keepAlive(ctx, id) + if ka != nil { + return ka.ch, nil + } + + inter, err := l.store.Get(ctx, string(id), store.WithResourceType(store.Lease)) + if err != nil { + return nil, err + } + + lease := inter.(*Lease) + ka = &keepAlive{ + ch: make(chan struct{}), + deadline: time.Now().Add(time.Duration(lease.TTL) * time.Second), + nextKeepAlive: time.Now().Add(time.Duration(lease.TTL) * time.Second / 3.0), + } + + l.mu.Lock() + defer l.mu.Unlock() + if _, ok := l.keepAliveMap[id]; !ok { + l.keepAliveMap[id] = ka + } + + return ka.ch, nil +} + +func (l *lessor) deadlineLoop() { + defer l.wg.Done() + + for { + select { + case <-time.After(time.Second): + case <-l.stopC: + return + } + now := time.Now() + l.mu.Lock() + for id, ka := range l.keepAliveMap { + if ka.deadline.Before(now) { + // waited too long for response; lease may be expired + ka.close() + delete(l.keepAliveMap, id) + } + } + l.mu.Unlock() + } +} + +func (l *lessor) keepAliveLoop() { + defer l.wg.Done() + + for { + var toKeep []LeaseID + + now := time.Now() + l.mu.Lock() + for id, ka := range l.keepAliveMap { + if ka.nextKeepAlive.Before(now) { + toKeep = append(toKeep, id) + } + } + l.mu.Unlock() + + for _, id := range toKeep { + lease := &Lease{ + LeaseID: id, + Key: "", + Value: "", + TTL: 0, + } + + inter, err := l.store.Update(context.TODO(), string(lease.LeaseID), lease, store.WithResourceType(store.Lease)) + if err == nil { + lease = inter.(*Lease) + l.mu.Lock() + if ka, ok := l.keepAliveMap[id]; ok { + ka.deadline = time.Now().Add(time.Duration(lease.TTL) * time.Second) + ka.nextKeepAlive = time.Now().Add(time.Duration(lease.TTL) * time.Second / 3.0) + } + l.mu.Unlock() + } else if dferrors.CheckError(err, dfcodes.ManagerStoreNotFound) { + l.mu.Lock() + if ka, ok := l.keepAliveMap[id]; ok { + ka.close() + delete(l.keepAliveMap, id) + } + l.mu.Unlock() + } + } + + select { + case <-time.After(retryConnWait): + case <-l.stopC: + return + } + } +} + +func (l *lessor) Close() error { + close(l.stopC) + l.wg.Wait() + l.stopC = nil + return nil +} + +func (ka *keepAlive) close() { + ka.ch <- struct{}{} +} diff --git a/manager/server/lease_test.go b/manager/server/lease_test.go new file mode 100644 index 000000000..ef3c2009f --- /dev/null +++ b/manager/server/lease_test.go @@ -0,0 +1,206 @@ +package server + +import ( + "context" + "fmt" + "sync" + "testing" + "time" + + "d7y.io/dragonfly/v2/manager/config" + "d7y.io/dragonfly/v2/manager/lease" + "d7y.io/dragonfly/v2/manager/store/client" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/suite" +) + +type LeaseTestSuite struct { + suite.Suite + lessor lease.Lessor +} + +func (suite *LeaseTestSuite) TestGrantSuccess() { + assert := assert.New(suite.T()) + + randStr := suite.randPrefix() + id1, err := suite.lessor.Grant(context.TODO(), randStr+"key1", randStr+"value1", 10) + assert.Nil(err) + assert.NotEmpty(id1) + + id2, err := suite.lessor.Grant(context.TODO(), randStr+"key2", randStr+"value2", 20) + assert.Nil(err) + assert.NotEmpty(id2) + + if len(id1) > 0 { + suite.lessor.Revoke(context.TODO(), id1) + } + + if len(id2) > 0 { + suite.lessor.Revoke(context.TODO(), id2) + } +} + +func (suite *LeaseTestSuite) TestGrantFailed() { + assert := assert.New(suite.T()) + + randStr := suite.randPrefix() + id1, err := suite.lessor.Grant(context.TODO(), randStr+"key1", randStr+"value1", 10) + assert.Nil(err) + assert.NotEmpty(id1) + + id2, err := suite.lessor.Grant(context.TODO(), randStr+"key1", randStr+"value1", 20) + assert.NotNil(err) + assert.Empty(id2) + + time.Sleep(time.Duration(5) * time.Second) + id3, err := suite.lessor.Grant(context.TODO(), randStr+"key1", randStr+"value1", 20) + assert.NotNil(err) + assert.Empty(id3) + + time.Sleep(time.Duration(6) * time.Second) + id4, err := suite.lessor.Grant(context.TODO(), randStr+"key1", randStr+"value1", 20) + assert.Nil(err) + assert.NotEmpty(id4) + + if len(id1) > 0 { + suite.lessor.Revoke(context.TODO(), id1) + } + + if len(id2) > 0 { + suite.lessor.Revoke(context.TODO(), id2) + } + + if len(id3) > 0 { + suite.lessor.Revoke(context.TODO(), id3) + } + + if len(id4) > 0 { + suite.lessor.Revoke(context.TODO(), id4) + } +} + +func (suite *LeaseTestSuite) TestKeepAliveSuccess() { + assert := assert.New(suite.T()) + + randStr := suite.randPrefix() + id1, err := suite.lessor.Grant(context.TODO(), randStr+"key1", randStr+"value1", 10) + assert.Nil(err) + assert.NotEmpty(id1) + + if len(id1) > 0 { + ch, err := suite.lessor.KeepAlive(context.TODO(), id1) + assert.Nil(err) + assert.NotNil(ch) + } + + time.Sleep(time.Duration(15) * time.Second) + id2, err := suite.lessor.Grant(context.TODO(), randStr+"key1", randStr+"value1", 10) + assert.NotNil(err) + assert.Empty(id2) + + if len(id1) > 0 { + suite.lessor.Revoke(context.TODO(), id1) + } + + if len(id2) > 0 { + suite.lessor.Revoke(context.TODO(), id2) + } +} + +func (suite *LeaseTestSuite) TestKeepAliveTimeout() { + assert := assert.New(suite.T()) + + randStr := suite.randPrefix() + id1, err := suite.lessor.Grant(context.TODO(), randStr+"key1", randStr+"value1", 10) + assert.Nil(err) + assert.NotEmpty(id1) + + if len(id1) > 0 { + ch, err := suite.lessor.KeepAlive(context.TODO(), id1) + assert.Nil(err) + assert.NotNil(ch) + } + + time.Sleep(time.Duration(15) * time.Second) + id2, err := suite.lessor.Grant(context.TODO(), randStr+"key1", randStr+"value1", 10) + assert.NotNil(err) + assert.Empty(id2) + + suite.lessor.Close() + defer func() { + suite.lessor = nil + }() + + time.Sleep(time.Duration(15) * time.Second) + id3, err := suite.lessor.Grant(context.TODO(), randStr+"key1", randStr+"value1", 10) + assert.Nil(err) + assert.NotEmpty(id3) + + if len(id1) > 0 { + suite.lessor.Revoke(context.TODO(), id1) + } + + if len(id2) > 0 { + suite.lessor.Revoke(context.TODO(), id2) + } + + if len(id3) > 0 { + suite.lessor.Revoke(context.TODO(), id2) + } +} + +func (suite *LeaseTestSuite) TestKeepAliveMeetRevoke() { + assert := assert.New(suite.T()) + + randStr := suite.randPrefix() + id1, err := suite.lessor.Grant(context.TODO(), randStr+"key1", randStr+"value1", 10) + assert.Nil(err) + assert.NotEmpty(id1) + + ch, err := suite.lessor.KeepAlive(context.TODO(), id1) + assert.Nil(err) + assert.NotNil(ch) + + var wg sync.WaitGroup + wg.Add(1) + go func(ch chan struct{}) { + defer wg.Done() + for { + select { + case <-ch: + return + } + } + }(ch) + + time.Sleep(time.Duration(15) * time.Second) + suite.lessor.Revoke(context.TODO(), id1) + + wg.Wait() +} + +func (suite *LeaseTestSuite) randPrefix() string { + return fmt.Sprintf("%d_", time.Now().Unix()) +} + +func (suite *LeaseTestSuite) SetupTest() { + assert := assert.New(suite.T()) + + cfg := config.New() + store, err := client.NewStore(cfg) + assert.Nil(err) + assert.NotNil(store) + + suite.lessor, err = lease.NewLessor(store) + assert.Nil(err) +} + +func (suite *LeaseTestSuite) TearDownTest() { + if suite.lessor != nil { + suite.lessor.Close() + } +} + +func TestLeaseTestSuite(t *testing.T) { + suite.Run(t, new(LeaseTestSuite)) +} diff --git a/manager/server/server.go b/manager/server/server.go index c4faf9f39..c9456e770 100644 --- a/manager/server/server.go +++ b/manager/server/server.go @@ -94,11 +94,24 @@ func (s *Server) Serve() error { } func (s *Server) Stop() { + if s.ms != nil { + err := s.ms.Close() + if err != nil { + logger.Errorf("failed to stop manager server: %+v", err) + } + + s.ms = nil + } + rpc.StopServer() ctx, cancel := context.WithTimeout(context.TODO(), 5*time.Second) defer cancel() - if err := s.httpServer.Shutdown(ctx); err != nil { + + err := s.httpServer.Shutdown(ctx) + if err != nil { logger.Errorf("failed to stop manager http server: %+v", err) } + + s.httpServer = nil } diff --git a/manager/server/server_test.go b/manager/server/server_test.go index 7b428d18d..068750458 100644 --- a/manager/server/server_test.go +++ b/manager/server/server_test.go @@ -25,32 +25,6 @@ type ServerTestSuite struct { client client.ManagerClient } -func (suite *ServerTestSuite) testMysqlConfig() *config.Config { - return &config.Config{ - Server: &config.ServerConfig{ - Port: 8004, - }, - Configure: &config.ConfigureConfig{ - StoreName: "store1", - }, - Stores: []*config.StoreConfig{ - { - Name: "store1", - Type: "mysql", - Mysql: &config.MysqlConfig{ - User: "root", - Password: "root1234", - IP: "127.0.0.1", - Port: 3306, - Db: "config_db", - }, - Oss: nil, - }, - }, - HostService: &config.HostService{}, - } -} - func (suite *ServerTestSuite) testDefaultSchedulerCluster() *types.SchedulerCluster { schedulerConfigMap := map[string]string{ "schedulerConfig_a": "a", @@ -86,74 +60,7 @@ func (suite *ServerTestSuite) testDefaultCDNCluster() *types.CDNCluster { } } -// -//func (suite *ServerTestSuite) TestGetSchedulers() { -// assert := assert.New(suite.T()) -// -// var cluster *types.SchedulerCluster -// { -// cluster = suite.testDefaultSchedulerCluster() -// ret, err := suite.server.ms.AddSchedulerCluster(context.TODO(), cluster) -// assert.NotNil(ret) -// assert.Nil(err) -// cluster.ClusterID = ret.ClusterID -// } -// -// hostName := "magneto-controller011162004111.nt12" -// ip := "11.162.4.111" -// var instance *types.SchedulerInstance -// { -// instance = &types.SchedulerInstance{ -// ClusterID: cluster.ClusterID, -// SecurityDomain: "security_abc", -// IDC: "idc_abc", -// Location: "location_abc", -// NetConfig: "", -// HostName: hostName, -// IP: ip, -// Port: 80, -// } -// -// ret, err := suite.server.ms.AddSchedulerInstance(context.TODO(), instance) -// assert.NotNil(ret) -// assert.Nil(err) -// instance.InstanceID = ret.InstanceID -// } -// -// { -// req := &manager.GetSchedulersRequest{ -// IP: ip, -// HostName: hostName, -// } -// -// ret, err := suite.server.ms.GetSchedulers(context.TODO(), req) -// assert.NotNil(err) -// assert.Nil(ret) -// } -// -// { -// req := &manager.KeepAliveRequest{ -// HostName: hostName, -// Type: manager.ResourceType_Scheduler, -// } -// -// err := suite.server.ms.KeepAlive(context.TODO(), req) -// assert.Nil(err) -// } -// -// { -// req := &manager.GetSchedulersRequest{ -// IP: ip, -// HostName: hostName, -// } -// -// ret, err := suite.server.ms.GetSchedulers(context.TODO(), req) -// assert.Nil(err) -// assert.NotNil(ret) -// } -//} -// -func (suite *ServerTestSuite) TestKeepAlive() { +func (suite *ServerTestSuite) TestGetSchedulers() { assert := assert.New(suite.T()) var cluster *types.SchedulerCluster @@ -173,7 +80,7 @@ func (suite *ServerTestSuite) TestKeepAlive() { IDC: "idc_abc", Location: "location_abc", NetConfig: "", - HostName: "hostname_abc", + HostName: suite.randPrefix() + "hostname_abc", IP: "192.168.0.11", Port: 80, } @@ -184,9 +91,61 @@ func (suite *ServerTestSuite) TestKeepAlive() { instance.InstanceID = ret.InstanceID } - for i := 0; i < 20; i++ { + { req := &manager.KeepAliveRequest{ - HostName: "hostname_abc", + HostName: instance.HostName, + Type: manager.ResourceType_Scheduler, + } + + err := suite.server.ms.KeepAlive(context.TODO(), req) + assert.Nil(err) + } + + { + req := &manager.GetSchedulersRequest{ + HostName: instance.HostName, + } + + ret, err := suite.server.ms.GetSchedulers(context.TODO(), req) + assert.Nil(err) + assert.NotNil(ret) + } +} + +func (suite *ServerTestSuite) TestKeepAliveScheduler() { + assert := assert.New(suite.T()) + + var cluster *types.SchedulerCluster + { + cluster = suite.testDefaultSchedulerCluster() + ret, err := suite.server.ms.AddSchedulerCluster(context.TODO(), cluster) + assert.NotNil(ret) + assert.Nil(err) + cluster.ClusterID = ret.ClusterID + } + + var instance *types.SchedulerInstance + { + instance = &types.SchedulerInstance{ + ClusterID: cluster.ClusterID, + SecurityDomain: "security_abc", + IDC: "idc_abc", + Location: "location_abc", + NetConfig: "", + HostName: suite.randPrefix() + "hostname_abc", + IP: "192.168.0.11", + Port: 80, + } + + ret, err := suite.server.ms.AddSchedulerInstance(context.TODO(), instance) + assert.NotNil(ret) + assert.Nil(err) + instance.InstanceID = ret.InstanceID + } + + for i := 0; i < 10; i++ { + req := &manager.KeepAliveRequest{ + HostName: instance.HostName, Type: manager.ResourceType_Scheduler, } @@ -211,48 +170,225 @@ func (suite *ServerTestSuite) TestKeepAlive() { } } -//func (suite *ServerTestSuite) TestGetClusterConfig() { -// assert := assert.New(suite.T()) -// -// var cluster *types.SchedulerCluster -// { -// cluster = suite.testDefaultSchedulerCluster() -// ret, err := suite.server.ms.AddSchedulerCluster(context.TODO(), cluster) -// assert.NotNil(ret) -// assert.Nil(err) -// cluster.ClusterID = ret.ClusterID -// } -// -// var instance *types.SchedulerInstance -// { -// instance = &types.SchedulerInstance{ -// ClusterID: cluster.ClusterID, -// SecurityDomain: "security_abc", -// IDC: "idc_abc", -// Location: "location_abc", -// NetConfig: "", -// HostName: "dragonfly2-scheduler011239070235.nt12", -// IP: "11.239.70.235", -// Port: 80, -// } -// -// ret, err := suite.server.ms.AddSchedulerInstance(context.TODO(), instance) -// assert.NotNil(ret) -// assert.Nil(err) -// instance.InstanceID = ret.InstanceID -// } -// -// { -// req := &manager.GetClusterConfigRequest{ -// HostName: "dragonfly2-scheduler011239070235.nt12", -// Type: manager.ResourceType_Scheduler, -// } -// -// ret, err := suite.server.ms.GetClusterConfig(context.TODO(), req) -// assert.NotNil(err) -// assert.Nil(ret) -// } -//} +func (suite *ServerTestSuite) TestKeepAliveCDN() { + assert := assert.New(suite.T()) + + var cluster *types.CDNCluster + { + cluster = suite.testDefaultCDNCluster() + ret, err := suite.server.ms.AddCDNCluster(context.TODO(), cluster) + assert.NotNil(ret) + assert.Nil(err) + cluster.ClusterID = ret.ClusterID + } + + var instance *types.CDNInstance + { + instance = &types.CDNInstance{ + ClusterID: cluster.ClusterID, + IDC: "idc_abc", + Location: "location_abc", + HostName: suite.randPrefix() + "hostname_abc", + IP: "ip_abc", + Port: 0, + RPCPort: 0, + DownPort: 0, + } + + ret, err := suite.server.ms.AddCDNInstance(context.TODO(), instance) + assert.NotNil(ret) + assert.Nil(err) + instance.InstanceID = ret.InstanceID + } + + for i := 0; i < 10; i++ { + req := &manager.KeepAliveRequest{ + HostName: instance.HostName, + Type: manager.ResourceType_Cdn, + } + + err := suite.server.ms.KeepAlive(context.TODO(), req) + assert.Nil(err) + + if i%2 == 0 { + time.Sleep(configsvc.KeepAliveTimeoutMax - time.Second) + + ret, err := suite.server.ms.GetCDNInstance(context.TODO(), instance.InstanceID) + assert.NotNil(ret) + assert.Nil(err) + assert.Equal(configsvc.InstanceActive, ret.State) + } else { + time.Sleep(configsvc.KeepAliveTimeoutMax * 2) + + ret, err := suite.server.ms.GetCDNInstance(context.TODO(), instance.InstanceID) + assert.NotNil(ret) + assert.Nil(err) + assert.Equal(configsvc.InstanceInactive, ret.State) + } + } +} + +func (suite *ServerTestSuite) TestGetSchedulerClusterConfig() { + assert := assert.New(suite.T()) + + var cluster *types.SchedulerCluster + { + cluster = suite.testDefaultSchedulerCluster() + ret, err := suite.server.ms.AddSchedulerCluster(context.TODO(), cluster) + assert.NotNil(ret) + assert.Nil(err) + cluster.ClusterID = ret.ClusterID + } + + var instance *types.SchedulerInstance + { + instance = &types.SchedulerInstance{ + ClusterID: cluster.ClusterID, + SecurityDomain: "security_abc", + IDC: "idc_abc", + Location: "location_abc", + NetConfig: "", + HostName: suite.randPrefix() + "hostname_abc", + IP: "192.168.0.11", + Port: 80, + } + + ret, err := suite.server.ms.AddSchedulerInstance(context.TODO(), instance) + assert.NotNil(ret) + assert.Nil(err) + instance.InstanceID = ret.InstanceID + } + + { + req := &manager.GetClusterConfigRequest{ + HostName: instance.HostName, + Type: manager.ResourceType_Scheduler, + } + + ret, err := suite.server.ms.GetClusterConfig(context.TODO(), req) + assert.Nil(err) + assert.NotNil(ret) + cfg := ret.GetSchedulerConfig() + assert.Equal(cluster.SchedulerConfig, cfg.ClusterConfig) + assert.Equal(cluster.ClientConfig, cfg.ClientConfig) + } + + var cdnCluster *types.CDNCluster + { + cdnCluster = suite.testDefaultCDNCluster() + ret, err := suite.server.ms.AddCDNCluster(context.TODO(), cdnCluster) + assert.NotNil(ret) + assert.Nil(err) + cdnCluster.ClusterID = ret.ClusterID + } + + var cdnInstance *types.CDNInstance + { + cdnInstance = &types.CDNInstance{ + ClusterID: cdnCluster.ClusterID, + IDC: "idc_abc", + Location: "location_abc", + HostName: suite.randPrefix() + "hostname_abc", + IP: "ip_abc", + Port: 0, + RPCPort: 0, + DownPort: 0, + } + + ret, err := suite.server.ms.AddCDNInstance(context.TODO(), cdnInstance) + assert.NotNil(ret) + assert.Nil(err) + cdnInstance.InstanceID = ret.InstanceID + } + + { + req := &manager.KeepAliveRequest{ + HostName: cdnInstance.HostName, + Type: manager.ResourceType_Cdn, + } + + err := suite.server.ms.KeepAlive(context.TODO(), req) + assert.Nil(err) + } + + { + var schedulerConfigMap map[string]string + err := json.Unmarshal([]byte(cluster.SchedulerConfig), &schedulerConfigMap) + assert.Nil(err) + + schedulerConfigMap["CDN_CLUSTER_ID"] = cdnCluster.ClusterID + schedulerConfigByte, err := json.Marshal(schedulerConfigMap) + assert.Nil(err) + cluster.SchedulerConfig = string(schedulerConfigByte) + + suite.server.ms.UpdateSchedulerCluster(context.TODO(), cluster) + } + + { + req := &manager.GetClusterConfigRequest{ + HostName: instance.HostName, + Type: manager.ResourceType_Scheduler, + } + + ret, err := suite.server.ms.GetClusterConfig(context.TODO(), req) + assert.Nil(err) + assert.NotNil(ret) + cfg := ret.GetSchedulerConfig() + assert.Equal(cluster.SchedulerConfig, cfg.ClusterConfig) + assert.Equal(cluster.ClientConfig, cfg.ClientConfig) + + cdnHost := cfg.GetCdnHosts() + assert.Equal(1, len(cdnHost)) + + assert.Equal(cdnInstance.HostName, cdnHost[0].HostInfo.HostName) + assert.Equal(cdnInstance.IP, cdnHost[0].HostInfo.Ip) + } +} + +func (suite *ServerTestSuite) TestGetCDNClusterConfig() { + assert := assert.New(suite.T()) + + var cluster *types.CDNCluster + { + cluster = suite.testDefaultCDNCluster() + ret, err := suite.server.ms.AddCDNCluster(context.TODO(), cluster) + assert.NotNil(ret) + assert.Nil(err) + cluster.ClusterID = ret.ClusterID + } + + var instance *types.CDNInstance + { + instance = &types.CDNInstance{ + ClusterID: cluster.ClusterID, + IDC: "idc_abc", + Location: "location_abc", + HostName: suite.randPrefix() + "hostname_abc", + IP: "ip_abc", + Port: 0, + RPCPort: 0, + DownPort: 0, + } + + ret, err := suite.server.ms.AddCDNInstance(context.TODO(), instance) + assert.NotNil(ret) + assert.Nil(err) + instance.InstanceID = ret.InstanceID + } + + { + req := &manager.GetClusterConfigRequest{ + HostName: instance.HostName, + Type: manager.ResourceType_Cdn, + } + + ret, err := suite.server.ms.GetClusterConfig(context.TODO(), req) + assert.Nil(err) + assert.NotNil(ret) + cfg := ret.GetCdnConfig() + assert.Equal(cluster.Config, cfg.ClusterConfig) + } +} func (suite *ServerTestSuite) TestSchedulerCluster() { assert := assert.New(suite.T()) @@ -341,7 +477,7 @@ func (suite *ServerTestSuite) TestSchedulerInstance() { IDC: "idc_abc", Location: "location_abc", NetConfig: "", - HostName: "hostname_abc", + HostName: suite.randPrefix() + "hostname_abc", IP: "192.168.0.11", Port: 80, } @@ -481,7 +617,7 @@ func (suite *ServerTestSuite) TestCDNInstance() { ClusterID: cluster.ClusterID, IDC: "idc_abc", Location: "location_abc", - HostName: "hostName_abc", + HostName: suite.randPrefix() + "hostname_abc", IP: "ip_abc", Port: 0, RPCPort: 0, @@ -619,12 +755,16 @@ func (suite *ServerTestSuite) TestSecurityDomain() { } } +func (suite *ServerTestSuite) randPrefix() string { + return fmt.Sprintf("%d_", time.Now().Unix()) +} + func (suite *ServerTestSuite) SetupSuite() { assert := assert.New(suite.T()) configsvc.KeepAliveTimeoutMax = 2 * time.Second _ = logcore.InitManager(false) - cfg := suite.testMysqlConfig() + cfg := config.New() server, err := New(cfg) assert.Nil(err) assert.NotNil(server) diff --git a/manager/server/service/manager_server.go b/manager/server/service/manager_server.go index 79d4dfc8d..ede4b648b 100644 --- a/manager/server/service/manager_server.go +++ b/manager/server/service/manager_server.go @@ -7,8 +7,10 @@ import ( "d7y.io/dragonfly/v2/manager/apis/v2/types" "d7y.io/dragonfly/v2/manager/config" "d7y.io/dragonfly/v2/manager/configsvc" + "d7y.io/dragonfly/v2/manager/dc" "d7y.io/dragonfly/v2/manager/host" "d7y.io/dragonfly/v2/manager/hostidentifier" + "d7y.io/dragonfly/v2/manager/lease" "d7y.io/dragonfly/v2/manager/store" "d7y.io/dragonfly/v2/manager/store/client" "d7y.io/dragonfly/v2/pkg/dfcodes" @@ -22,36 +24,70 @@ type ManagerServer struct { store store.Store hostManager host.Manager configSvc *configsvc.ConfigSvc + lessor lease.Lessor + redisClient *dc.RedisClient } func NewManagerServer(cfg *config.Config) *ManagerServer { - if err := cfg.Valid(); err != nil { + var err error + mgr := &ManagerServer{} + defer func() { + if err != nil { + mgr.Close() + } + }() + + if err = cfg.Valid(); err != nil { return nil } - identifier := hostidentifier.NewIdentifier() + mgr.identifier = hostidentifier.NewIdentifier() - store, err := client.NewStore(cfg) + mgr.store, err = client.NewStore(cfg) if err != nil { return nil } - configSvc, err := configsvc.NewConfigSvc(store, identifier) + mgr.lessor, err = lease.NewLessor(mgr.store) if err != nil { return nil } - hostManager, err := host.NewManager(cfg.HostService) + mgr.redisClient, err = dc.NewRedisClient(cfg.Redis) if err != nil { return nil } - return &ManagerServer{ - identifier: identifier, - store: store, - hostManager: hostManager, - configSvc: configSvc, + mgr.configSvc, err = configsvc.NewConfigSvc(mgr.store, mgr.identifier, mgr.lessor, mgr.redisClient) + if err != nil { + return nil } + + mgr.hostManager, err = host.NewManager(cfg.HostService) + if err != nil { + return nil + } + + return mgr +} + +func (ms *ManagerServer) Close() error { + if ms.configSvc != nil { + _ = ms.configSvc.Close() + ms.configSvc = nil + } + + if ms.redisClient != nil { + _ = ms.redisClient.Close() + ms.redisClient = nil + } + + if ms.lessor != nil { + _ = ms.lessor.Close() + ms.lessor = nil + } + + return nil } func (ms *ManagerServer) GetSchedulers(ctx context.Context, req *manager.GetSchedulersRequest) (*manager.SchedulerNodes, error) { diff --git a/manager/store/orm/cdn_instance.go b/manager/store/orm/cdn_instance.go index 3369abc7a..ab0556098 100644 --- a/manager/store/orm/cdn_instance.go +++ b/manager/store/orm/cdn_instance.go @@ -137,6 +137,9 @@ func (s *CDNInstanceStore) Delete(ctx context.Context, id string, opts ...store. } func (s *CDNInstanceStore) Update(ctx context.Context, id string, data interface{}, opts ...store.OpOption) (interface{}, error) { + op := store.Op{} + op.ApplyOpts(opts) + tInstance := &CDNInstanceTable{} tx := s.withTable(ctx).Where("instance_id = ?", id).First(tInstance) if err := tx.Error; errors.Is(err, gorm.ErrRecordNotFound) { @@ -149,6 +152,11 @@ func (s *CDNInstanceStore) Update(ctx context.Context, id string, data interface } instance := CDNInstanceToTable(i) + if op.Keepalive { + tInstance.State = instance.State + instance = tInstance + } + s.updateSchemaToTable(instance, tInstance) tx = tx.Updates(instance) if err := tx.Error; errors.Is(err, gorm.ErrRecordNotFound) { diff --git a/manager/store/orm/lease.go b/manager/store/orm/lease.go new file mode 100644 index 000000000..d1523d778 --- /dev/null +++ b/manager/store/orm/lease.go @@ -0,0 +1,191 @@ +package orm + +import ( + "context" + "errors" + "time" + + "d7y.io/dragonfly/v2/manager/lease" + "d7y.io/dragonfly/v2/manager/store" + "d7y.io/dragonfly/v2/pkg/dfcodes" + "d7y.io/dragonfly/v2/pkg/dferrors" + "gorm.io/gorm" +) + +type LeaseTable struct { + ID uint `gorm:"column:id;primaryKey"` + LeaseID string `gorm:"column:lease_id;unique;size:63"` + Key string `gorm:"column:lease_key;unique;size:255"` + Value string `gorm:"column:lease_value;size:255"` + TTL int64 `gorm:"column:ttl"` + CreatedAt time.Time `gorm:"column:created_at"` + UpdatedAt time.Time `gorm:"column:updated_at"` +} + +type LeaseStore struct { + resourceType store.ResourceType + db *gorm.DB + table string +} + +func NewLeaseStore(db *gorm.DB, table string) (store.Store, error) { + s := &LeaseStore{ + resourceType: store.Lease, + db: db, + table: table, + } + + err := s.withTable(context.Background()).AutoMigrate(&LeaseTable{}) + if err != nil { + return nil, err + } + + return s, nil +} + +func LeaseToTable(t *lease.Lease) *LeaseTable { + return &LeaseTable{ + LeaseID: string(t.LeaseID), + Key: t.Key, + Value: t.Value, + TTL: t.TTL, + } +} + +func LeaseToSchema(t *LeaseTable) *lease.Lease { + return &lease.Lease{ + LeaseID: lease.LeaseID(t.LeaseID), + Key: t.Key, + Value: t.Value, + TTL: t.TTL, + } +} + +func (s *LeaseStore) updateSchemaToTable(new, old *LeaseTable) *LeaseTable { + new.ID = old.ID + new.LeaseID = old.LeaseID + new.Key = old.Key + new.Value = old.Value + new.TTL = old.TTL + return new +} + +func (s *LeaseStore) withTable(ctx context.Context) (tx *gorm.DB) { + return s.db.WithContext(ctx).Table(s.table) +} + +func (s *LeaseStore) Add(ctx context.Context, id string, data interface{}, opts ...store.OpOption) (interface{}, error) { + l, ok := data.(*lease.Lease) + if !ok { + return nil, dferrors.Newf(dfcodes.ManagerStoreError, "add lease error: reflect lease error") + } + + le := LeaseToTable(l) + tx := s.withTable(ctx).Create(le) + + if tx.Error != nil { + err := tx.Error + + tLease := &LeaseTable{} + tx = s.withTable(ctx).Where("lease_key = ?", le.Key).First(tLease) + if tx.Error != nil { + return nil, dferrors.Newf(dfcodes.ManagerStoreError, "add lease error: %s", tx.Error.Error()) + } + + now := time.Now() + if now.Before(tLease.UpdatedAt.Add(time.Duration(tLease.TTL) * time.Second)) { + return nil, err + } + + tx = tx.Unscoped().Delete(tLease) + if tx.Error != nil { + return nil, tx.Error + } + + tx = tx.Create(le) + if tx.Error != nil { + return nil, tx.Error + } + } + + return LeaseToSchema(le), nil +} + +func (s *LeaseStore) Delete(ctx context.Context, id string, opts ...store.OpOption) (interface{}, error) { + le := &LeaseTable{} + tx := s.withTable(ctx).Where("lease_id = ?", id).First(le) + if err := tx.Error; errors.Is(err, gorm.ErrRecordNotFound) { + return nil, nil + } + + if tx.Error != nil { + return nil, dferrors.Newf(dfcodes.ManagerStoreError, "delete lease error: %s", tx.Error.Error()) + } + + tx = tx.Unscoped().Delete(le) + if err := tx.Error; errors.Is(err, gorm.ErrRecordNotFound) { + return nil, nil + } + + if tx.Error != nil { + return nil, dferrors.Newf(dfcodes.ManagerStoreError, "delete lease error: %s", tx.Error.Error()) + } + + return LeaseToSchema(le), nil +} + +func (s *LeaseStore) Update(ctx context.Context, id string, data interface{}, opts ...store.OpOption) (interface{}, error) { + tLease := &LeaseTable{} + tx := s.withTable(ctx).Where("lease_id = ?", id).First(tLease) + if err := tx.Error; errors.Is(err, gorm.ErrRecordNotFound) { + return nil, dferrors.Newf(dfcodes.ManagerStoreNotFound, "update lease error: %s", err.Error()) + } + + l, ok := data.(*lease.Lease) + if !ok { + return nil, dferrors.Newf(dfcodes.ManagerStoreError, "update lease error: reflect lease error") + } + + le := LeaseToTable(l) + s.updateSchemaToTable(le, tLease) + tx = tx.Updates(le) + if err := tx.Error; errors.Is(err, gorm.ErrRecordNotFound) { + return nil, dferrors.Newf(dfcodes.ManagerStoreNotFound, "update lease error: %s", err.Error()) + } else if tx.Error != nil { + return nil, dferrors.Newf(dfcodes.ManagerStoreError, "update lease error: %s", tx.Error.Error()) + } else { + return LeaseToSchema(le), nil + } +} + +func (s *LeaseStore) Get(ctx context.Context, id string, opts ...store.OpOption) (interface{}, error) { + le := &LeaseTable{} + tx := s.withTable(ctx).Where("lease_id = ?", id).First(le) + if err := tx.Error; errors.Is(err, gorm.ErrRecordNotFound) { + return nil, dferrors.Newf(dfcodes.ManagerStoreNotFound, "get lease error: %s", err.Error()) + } else if tx.Error != nil { + return nil, dferrors.Newf(dfcodes.ManagerStoreError, "get lease error: %s", tx.Error.Error()) + } else { + return LeaseToSchema(le), nil + } +} + +func (s *LeaseStore) List(ctx context.Context, opts ...store.OpOption) ([]interface{}, error) { + op := store.Op{} + op.ApplyOpts(opts) + + var leases []*LeaseTable + var tx *gorm.DB + tx = s.withTable(ctx).Order("lease_id").Offset(op.Marker).Limit(op.MaxItemCount).Find(&leases) + + if tx.Error != nil { + return nil, dferrors.Newf(dfcodes.ManagerStoreError, "list security domain error %s", tx.Error.Error()) + } + + var inter []interface{} + for _, le := range leases { + inter = append(inter, LeaseToSchema(le)) + } + + return inter, nil +} diff --git a/manager/store/orm/orm_store.go b/manager/store/orm/orm_store.go index 7981cacd8..67f4dab21 100644 --- a/manager/store/orm/orm_store.go +++ b/manager/store/orm/orm_store.go @@ -2,7 +2,6 @@ package orm import ( "context" - "fmt" "net/url" "d7y.io/dragonfly/v2/manager/config" @@ -28,6 +27,7 @@ var ormTables = map[store.ResourceType]newTableSetup{ store.CDNCluster: NewCDNClusterStore, store.CDNInstance: NewCDNInstanceStore, store.SecurityDomain: NewSecurityDomainStore, + store.Lease: NewLeaseStore, } func newOrmStore(cfg *config.StoreConfig) (*ormStore, error) { @@ -40,7 +40,7 @@ func newOrmStore(cfg *config.StoreConfig) (*ormStore, error) { return nil, err } - u.Host = fmt.Sprintf("%s:%d", cfg.Mysql.IP, cfg.Mysql.Port) + u.Host = cfg.Mysql.Addr u.Path = cfg.Mysql.Db u.User = url.UserPassword(cfg.Mysql.User, cfg.Mysql.Password) q := u.Query() diff --git a/manager/store/orm/scheduler_instance.go b/manager/store/orm/scheduler_instance.go index ac75f4e26..2d8360c8b 100644 --- a/manager/store/orm/scheduler_instance.go +++ b/manager/store/orm/scheduler_instance.go @@ -142,6 +142,8 @@ func (s *SchedulerInstanceStore) Delete(ctx context.Context, id string, opts ... } func (s *SchedulerInstanceStore) Update(ctx context.Context, id string, data interface{}, opts ...store.OpOption) (interface{}, error) { + op := store.Op{} + op.ApplyOpts(opts) tInstance := &SchedulerInstanceTable{} tx := s.withTable(ctx).Where("instance_id = ?", id).First(tInstance) @@ -155,6 +157,11 @@ func (s *SchedulerInstanceStore) Update(ctx context.Context, id string, data int } instance := SchedulerInstanceToTable(i) + if op.Keepalive { + tInstance.State = instance.State + instance = tInstance + } + s.updateSchemaToTable(instance, tInstance) tx = tx.Updates(instance) if err := tx.Error; errors.Is(err, gorm.ErrRecordNotFound) { diff --git a/manager/store/store.go b/manager/store/store.go index ab5b5bea0..5a2f56102 100644 --- a/manager/store/store.go +++ b/manager/store/store.go @@ -12,6 +12,7 @@ const ( CDNCluster ResourceType = "CDNCluster" CDNInstance ResourceType = "CDNInstance" SecurityDomain ResourceType = "SecurityDomain" + Lease ResourceType = "Lease" ) func (objType ResourceType) String() string { @@ -19,11 +20,13 @@ func (objType ResourceType) String() string { } type Op struct { - ResourceType ResourceType - ClusterID string - InstanceID string - Marker int - MaxItemCount int + ResourceType ResourceType + ClusterID string + InstanceID string + Marker int + MaxItemCount int + Keepalive bool + SkipLocalCache bool } type OpOption func(*Op) @@ -66,3 +69,15 @@ func WithMarker(marker, maxItemCount int) OpOption { op.MaxItemCount = maxItemCount } } + +func WithKeepalive() OpOption { + return func(op *Op) { + op.Keepalive = true + } +} + +func WithSkipLocalCache() OpOption { + return func(op *Op) { + op.SkipLocalCache = true + } +}