Taking max replica requirement from jm and tms, updating health interpreter, adding getResourceQuantity function to kube library

Signed-off-by: mszacillo <mszacillo@bloomberg.net>
This commit is contained in:
mszacillo 2024-06-26 13:16:30 -04:00
parent f822072c34
commit 4f16de20ea
2 changed files with 50 additions and 15 deletions

View File

@ -17,6 +17,8 @@ limitations under the License.
package luavm
import (
"math"
lua "github.com/yuin/gopher-lua"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
@ -57,6 +59,7 @@ var kubeFuncs = map[string]lua.LGFunction{
"resourceAdd": resourceAdd,
"accuratePodRequirements": accuratePodRequirements,
"getPodDependencies": getPodDependencies,
"getResourceQuantity": getResourceQuantity,
}
func resourceAdd(ls *lua.LState) int {
@ -127,6 +130,30 @@ func getPodDependencies(ls *lua.LState) int {
return 1
}
func getResourceQuantity(ls *lua.LState) int {
n := ls.GetTop()
if n != 1 {
ls.RaiseError("getResourceQuantity only accepts one argument")
return 0
}
q := checkResourceQuantity(ls, n)
num := q.AsApproximateFloat64()
if num < 0 {
ls.RaiseError("int approximation unexpectedly returned a negative value: %#v,", q)
return 0
}
if math.IsInf(num, 1) {
ls.RaiseError("int approximation unexpectedly returned an infinite value: %#v,", q)
return 0
}
ls.Push(lua.LNumber(num))
return 1
}
func checkResourceQuantity(ls *lua.LState, n int) resource.Quantity {
v := ls.Get(n)
switch typ := v.Type(); typ {

View File

@ -11,7 +11,11 @@ spec:
luaScript: >
function InterpretHealth(observedObj)
if observedObj.status ~= nil and observedObj.status.jobStatus ~= nil then
return observedObj.status.jobStatus.state ~= 'CREATED' and observedObj.status.jobStatus.state ~= 'RECONCILING'
if observedObj.status.jobStatus.state ~= 'CREATED' and observedObj.status.jobStatus.state ~= 'RECONCILING' then
return true
else
return observedObj.status.jobManagerDeploymentStatus == 'ERROR'
end
end
return false
end
@ -24,33 +28,37 @@ spec:
end
function GetReplicas(observedObj)
-- FlinkDeployments presently will not be subdivided among clusters, replica should be 1
replica = 1
requires = {
resourceRequest = {},
nodeClaim = {},
}
-- Add jobmanager resources into replica requirement
jm_replicas = observedObj.spec.jobManager.replicas
if isempty(jm_replicas) then
jm_replicas = 1
end
for i = 1, jm_replicas do
requires.resourceRequest.cpu = kube.resourceAdd(requires.resourceRequest.cpu, tostring(observedObj.spec.jobManager.resource.cpu))
requires.resourceRequest.memory = kube.resourceAdd(requires.resourceRequest.memory, observedObj.spec.jobManager.resource.memory)
end
-- Add task manager resources into replica requirement
parallelism = observedObj.spec.job.parallelism
tms = math.ceil(parallelism / observedObj.spec.flinkConfiguration['taskmanager.numberOfTaskSlots'])
tm_replicas = math.ceil(parallelism / observedObj.spec.flinkConfiguration['taskmanager.numberOfTaskSlots'])
for i = 1, tms do
requires.resourceRequest.cpu = kube.resourceAdd(requires.resourceRequest.cpu, tostring(observedObj.spec.taskManager.resource.cpu))
requires.resourceRequest.memory = kube.resourceAdd(requires.resourceRequest.memory, observedObj.spec.taskManager.resource.memory)
replica = jm_replicas + tm_replicas
-- Until multiple podTemplates are supported in replicaRequirements, take max of cpu + memory values as requirement
requires.resourceRequest.cpu = math.max(observedObj.spec.taskManager.resource.cpu, observedObj.spec.jobManager.resource.cpu)
jm_memory_value = kube.getResourceQuantity(observedObj.spec.jobManager.resource.memory)
tm_memory_value = kube.getResourceQuantity(observedObj.spec.taskManager.resource.memory)
if jm_memory_value > tm_memory_value then
requires.resourceRequest.memory = observedObj.spec.jobManager.resource.memory
else
requires.resourceRequest.memory = observedObj.spec.taskManager.resource.memory
end
-- Until multiple podTemplates are supported, interpreter will only take affinity and toleration input to common podTemplate
requires.nodeClaim.nodeSelector = observedObj.spec.podTemplate.spec.nodeSelector
requires.nodeClaim.tolerations = observedObj.spec.podTemplate.spec.tolerations
return replica, requires
end
statusAggregation: