mirror of
https://github.com/kevin-DL/services.git
synced 2026-01-11 19:04:35 +00:00
replace db with store in function (#318)
* replace db with store in function * . * set id properly * .
This commit is contained in:
@@ -14,21 +14,18 @@ import (
|
||||
"github.com/micro/micro/v3/service/config"
|
||||
"github.com/micro/micro/v3/service/errors"
|
||||
log "github.com/micro/micro/v3/service/logger"
|
||||
"gopkg.in/yaml.v2"
|
||||
|
||||
"github.com/micro/micro/v3/service/runtime/source/git"
|
||||
|
||||
_struct "github.com/golang/protobuf/ptypes/struct"
|
||||
db "github.com/micro/services/db/proto"
|
||||
"github.com/micro/micro/v3/service/store"
|
||||
function "github.com/micro/services/function/proto"
|
||||
"github.com/micro/services/pkg/tenant"
|
||||
"gopkg.in/yaml.v2"
|
||||
)
|
||||
|
||||
type Function struct {
|
||||
project string
|
||||
// eg. https://us-central1-m3o-apis.cloudfunctions.net/
|
||||
address string
|
||||
db db.DbService
|
||||
limit int
|
||||
}
|
||||
|
||||
type Func struct {
|
||||
@@ -37,7 +34,7 @@ type Func struct {
|
||||
Project string `json:"project"`
|
||||
}
|
||||
|
||||
func NewFunction(db db.DbService) *Function {
|
||||
func NewFunction() *Function {
|
||||
v, err := config.Get("function.service_account_json")
|
||||
if err != nil {
|
||||
log.Fatalf("function.service_account_json: %v", err)
|
||||
@@ -64,7 +61,14 @@ func NewFunction(db db.DbService) *Function {
|
||||
if len(project) == 0 {
|
||||
log.Fatalf("empty project")
|
||||
}
|
||||
|
||||
v, err = config.Get("function.limit")
|
||||
if err != nil {
|
||||
log.Fatalf("function.limit: %v", err)
|
||||
}
|
||||
limit := v.Int(0)
|
||||
if limit > 0 {
|
||||
log.Infof("Function limit is %d", limit)
|
||||
}
|
||||
v, err = config.Get("function.service_account")
|
||||
if err != nil {
|
||||
log.Fatalf("function.service_account: %v", err)
|
||||
@@ -97,7 +101,7 @@ func NewFunction(db db.DbService) *Function {
|
||||
log.Fatalf(string(outp))
|
||||
}
|
||||
log.Info(string(outp))
|
||||
return &Function{project: project, address: address, db: db}
|
||||
return &Function{project: project, address: address, limit: limit}
|
||||
}
|
||||
|
||||
func (e *Function) Deploy(ctx context.Context, req *function.DeployRequest, rsp *function.DeployResponse) error {
|
||||
@@ -110,6 +114,9 @@ func (e *Function) Deploy(ctx context.Context, req *function.DeployRequest, rsp
|
||||
if len(req.Repo) == 0 {
|
||||
return errors.BadRequest("function.deploy", "Missing repo")
|
||||
}
|
||||
if req.Runtime == "" {
|
||||
return fmt.Errorf("missing runtime field, please specify nodejs14, go116 etc")
|
||||
}
|
||||
|
||||
gitter := git.NewGitter(map[string]string{})
|
||||
|
||||
@@ -141,15 +148,28 @@ func (e *Function) Deploy(ctx context.Context, req *function.DeployRequest, rsp
|
||||
project = "default"
|
||||
}
|
||||
|
||||
readRsp, err := e.db.Read(ctx, &db.ReadRequest{
|
||||
Table: "functions",
|
||||
Query: fmt.Sprintf("tenantId == '%v' and project == '%v' and name == '%v'", tenantId, project, req.Name),
|
||||
})
|
||||
if err != nil {
|
||||
key := fmt.Sprintf("%s/%s/%s", tenantId, project, req.Name)
|
||||
|
||||
records, err := store.Read(key)
|
||||
if err != nil && err != store.ErrNotFound {
|
||||
return err
|
||||
}
|
||||
if req.Runtime == "" {
|
||||
return fmt.Errorf("missing runtime field, please specify nodejs14, go116 etc")
|
||||
|
||||
if len(records) > 0 {
|
||||
return errors.BadRequest("function.deploy", "already exists")
|
||||
}
|
||||
|
||||
// check for function limit
|
||||
if e.limit > 0 {
|
||||
// read all the records for the user
|
||||
records, err := store.Read(tenantId+"/", store.ReadPrefix())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if v := len(records); v >= e.limit {
|
||||
return errors.BadRequest("function.deploy", "deployment limit reached")
|
||||
}
|
||||
}
|
||||
|
||||
// process the env vars to the required format
|
||||
@@ -178,9 +198,8 @@ func (e *Function) Deploy(ctx context.Context, req *function.DeployRequest, rsp
|
||||
}
|
||||
}()
|
||||
|
||||
s := &_struct.Struct{}
|
||||
id := fmt.Sprintf("%v-%v-%v", tenantId, project, req.Name)
|
||||
jso, _ := json.Marshal(map[string]interface{}{
|
||||
rec := store.NewRecord(key, map[string]interface{}{
|
||||
"id": id,
|
||||
"project": project,
|
||||
"name": req.Name,
|
||||
@@ -189,30 +208,107 @@ func (e *Function) Deploy(ctx context.Context, req *function.DeployRequest, rsp
|
||||
"subfolder": req.Subfolder,
|
||||
"entrypoint": req.Entrypoint,
|
||||
"runtime": req.Runtime,
|
||||
"env_vars": envVars,
|
||||
})
|
||||
err = s.UnmarshalJSON(jso)
|
||||
|
||||
// write the record
|
||||
return store.Write(rec)
|
||||
}
|
||||
|
||||
func (e *Function) Update(ctx context.Context, req *function.UpdateRequest, rsp *function.UpdateResponse) error {
|
||||
log.Info("Received Function.Update request")
|
||||
|
||||
if len(req.Name) == 0 {
|
||||
return errors.BadRequest("function.update", "Missing name")
|
||||
}
|
||||
|
||||
if len(req.Repo) == 0 {
|
||||
return errors.BadRequest("function.update", "Missing repo")
|
||||
}
|
||||
if req.Runtime == "" {
|
||||
return fmt.Errorf("missing runtime field, please specify nodejs14, go116 etc")
|
||||
}
|
||||
|
||||
tenantId, ok := tenant.FromContext(ctx)
|
||||
if !ok {
|
||||
tenantId = "micro"
|
||||
}
|
||||
|
||||
multitenantPrefix := strings.Replace(tenantId, "/", "-", -1)
|
||||
if req.Entrypoint == "" {
|
||||
req.Entrypoint = req.Name
|
||||
}
|
||||
|
||||
project := req.Project
|
||||
if project == "" {
|
||||
project = "default"
|
||||
}
|
||||
|
||||
key := fmt.Sprintf("%s/%s/%s", tenantId, project, req.Name)
|
||||
|
||||
records, err := store.Read(key)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if len(readRsp.Records) > 0 {
|
||||
_, err = e.db.Update(ctx, &db.UpdateRequest{
|
||||
Table: "functions",
|
||||
Record: s,
|
||||
Id: id,
|
||||
})
|
||||
if err != nil {
|
||||
log.Error(err)
|
||||
|
||||
if len(records) == 0 {
|
||||
return errors.BadRequest("function.deploy", "function does not exist")
|
||||
}
|
||||
|
||||
gitter := git.NewGitter(map[string]string{})
|
||||
|
||||
for _, branch := range []string{"master", "main"} {
|
||||
err = gitter.Checkout(req.Repo, branch)
|
||||
if err == nil {
|
||||
break
|
||||
}
|
||||
return err
|
||||
}
|
||||
_, err = e.db.Create(ctx, &db.CreateRequest{
|
||||
Table: "functions",
|
||||
Record: s,
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
log.Error(err)
|
||||
return errors.InternalServerError("function.update", err.Error())
|
||||
}
|
||||
return err
|
||||
|
||||
// process the env vars to the required format
|
||||
var envVars []string
|
||||
|
||||
for k, v := range req.EnvVars {
|
||||
envVars = append(envVars, k+"="+v)
|
||||
}
|
||||
|
||||
go func() {
|
||||
// https://jsoverson.medium.com/how-to-deploy-node-js-functions-to-google-cloud-8bba05e9c10a
|
||||
cmd := exec.Command("gcloud", "functions", "deploy",
|
||||
multitenantPrefix+"-"+req.Name, "--region", "europe-west1",
|
||||
"--allow-unauthenticated", "--entry-point", req.Entrypoint,
|
||||
"--trigger-http", "--project", e.project, "--runtime", req.Runtime)
|
||||
|
||||
// if env vars exist then set them
|
||||
if len(envVars) > 0 {
|
||||
cmd.Args = append(cmd.Args, "--set-env-vars", strings.Join(envVars, ","))
|
||||
}
|
||||
|
||||
cmd.Dir = filepath.Join(gitter.RepoDir(), req.Subfolder)
|
||||
outp, err := cmd.CombinedOutput()
|
||||
if err != nil {
|
||||
log.Error(fmt.Errorf(string(outp)))
|
||||
}
|
||||
}()
|
||||
|
||||
id := fmt.Sprintf("%v-%v-%v", tenantId, project, req.Name)
|
||||
rec := store.NewRecord(key, map[string]interface{}{
|
||||
"id": id,
|
||||
"project": project,
|
||||
"name": req.Name,
|
||||
"tenantId": tenantId,
|
||||
"repo": req.Repo,
|
||||
"subfolder": req.Subfolder,
|
||||
"entrypoint": req.Entrypoint,
|
||||
"runtime": req.Runtime,
|
||||
"env_vars": envVars,
|
||||
})
|
||||
|
||||
// write the record
|
||||
return store.Write(rec)
|
||||
}
|
||||
|
||||
func (e *Function) Call(ctx context.Context, req *function.CallRequest, rsp *function.CallResponse) error {
|
||||
@@ -289,12 +385,9 @@ func (e *Function) Delete(ctx context.Context, req *function.DeleteRequest, rsp
|
||||
return err
|
||||
}
|
||||
|
||||
id := fmt.Sprintf("%v-%v-%v", tenantId, project, req.Name)
|
||||
_, err = e.db.Delete(ctx, &db.DeleteRequest{
|
||||
Table: "functions",
|
||||
Id: id,
|
||||
})
|
||||
return err
|
||||
key := fmt.Sprintf("%v/%v/%v", tenantId, project, req.Name)
|
||||
|
||||
return store.Delete(key)
|
||||
}
|
||||
|
||||
func (e *Function) List(ctx context.Context, req *function.ListRequest, rsp *function.ListResponse) error {
|
||||
@@ -304,21 +397,18 @@ func (e *Function) List(ctx context.Context, req *function.ListRequest, rsp *fun
|
||||
if !ok {
|
||||
tenantId = "micro"
|
||||
}
|
||||
project := req.Project
|
||||
|
||||
q := fmt.Sprintf(`tenantId == "%v"`, tenantId)
|
||||
if project != "" {
|
||||
q += fmt.Sprintf(` and project == "%v"`, project)
|
||||
key := tenantId + "/"
|
||||
|
||||
project := req.Project
|
||||
if len(project) > 0 {
|
||||
key = key + "/" + project + "/"
|
||||
}
|
||||
log.Infof("Making query %v", q)
|
||||
readRsp, err := e.db.Read(ctx, &db.ReadRequest{
|
||||
Table: "functions",
|
||||
Query: q,
|
||||
})
|
||||
|
||||
records, err := store.Read(key, store.ReadPrefix())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
log.Info(readRsp.Records)
|
||||
|
||||
multitenantPrefix := strings.Replace(tenantId, "/", "-", -1)
|
||||
cmd := exec.Command("gcloud", "functions", "list", "--project", e.project, "--filter", "name~"+multitenantPrefix+"*")
|
||||
@@ -326,8 +416,10 @@ func (e *Function) List(ctx context.Context, req *function.ListRequest, rsp *fun
|
||||
if err != nil {
|
||||
log.Error(fmt.Errorf(string(outp)))
|
||||
}
|
||||
|
||||
lines := strings.Split(string(outp), "\n")
|
||||
statuses := map[string]string{}
|
||||
|
||||
for _, line := range lines {
|
||||
fields := strings.Fields(line)
|
||||
if len(fields) < 2 {
|
||||
@@ -337,17 +429,16 @@ func (e *Function) List(ctx context.Context, req *function.ListRequest, rsp *fun
|
||||
}
|
||||
|
||||
rsp.Functions = []*function.Func{}
|
||||
for _, record := range readRsp.Records {
|
||||
m := record.AsMap()
|
||||
bs, _ := json.Marshal(m)
|
||||
f := &function.Func{}
|
||||
err = json.Unmarshal(bs, f)
|
||||
if err != nil {
|
||||
|
||||
for _, record := range records {
|
||||
f := new(function.Func)
|
||||
if err := record.Decode(f); err != nil {
|
||||
return err
|
||||
}
|
||||
f.Status = statuses[multitenantPrefix+"-"+f.Name]
|
||||
rsp.Functions = append(rsp.Functions, f)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -367,12 +458,9 @@ func (e *Function) Describe(ctx context.Context, req *function.DescribeRequest,
|
||||
}
|
||||
|
||||
multitenantPrefix := strings.Replace(tenantId, "/", "-", -1)
|
||||
id := fmt.Sprintf("%v-%v-%v", tenantId, project, req.Name)
|
||||
key := fmt.Sprintf("%v/%v/%v", tenantId, project, req.Name)
|
||||
|
||||
readRsp, err := e.db.Read(ctx, &db.ReadRequest{
|
||||
Table: "functions",
|
||||
Id: id,
|
||||
})
|
||||
records, err := store.Read(key)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -391,12 +479,9 @@ func (e *Function) Describe(ctx context.Context, req *function.DescribeRequest,
|
||||
return err
|
||||
}
|
||||
|
||||
if len(readRsp.Records) > 0 {
|
||||
m := readRsp.Records[0].AsMap()
|
||||
bs, _ := json.Marshal(m)
|
||||
if len(records) > 0 {
|
||||
f := &function.Func{}
|
||||
err = json.Unmarshal(bs, f)
|
||||
if err != nil {
|
||||
if err := records[0].Decode(f); err != nil {
|
||||
return err
|
||||
}
|
||||
rsp.Function = f
|
||||
|
||||
Reference in New Issue
Block a user