Update credentials service bolt repo to use an index for sorting;

minor grpc refactoring; add migration that adds index to existing dbs
This commit is contained in:
mitchell 2019-07-31 23:16:20 -04:00
parent ed66895fcb
commit d42af8e859
4 changed files with 228 additions and 47 deletions

View File

@ -4,6 +4,7 @@ import (
"bytes" "bytes"
"context" "context"
"encoding/gob" "encoding/gob"
"fmt"
"os" "os"
"sync" "sync"
@ -32,35 +33,25 @@ func (db BoltDB) GetAllMetadata(ctx context.Context, sourceHost string, errch ch
defer close(mdch) defer close(mdch)
err := db.bolt.View(func(tx *bbolt.Tx) error { err := db.bolt.View(func(tx *bbolt.Tx) error {
bkt := tx.Bucket([]byte(credentialsBkt)) bkt := getCredentialsBucket(tx)
if bkt == nil { if bkt.isEmpty {
return nil return nil
} }
var wg sync.WaitGroup var wg sync.WaitGroup
err := bkt.ForEach(func(_, value []byte) error { c := bkt.hostPrimaryIndex.Cursor()
if sourceHost == "" {
for key, value := c.First(); key != nil; key, value = c.Next() {
wg.Add(1) wg.Add(1)
unmarshalAndSendCred(value, mdch, errch, &wg)
go func(value []byte) {
defer wg.Done()
var cred types.Credential
err := gobUnmarshal(value, &cred)
if err != nil {
errch <- err
return
} }
} else {
if sourceHost == "" || sourceHost == cred.SourceHost { hostBytes := []byte(sourceHost)
mdch <- cred.Metadata for key, value := c.Seek(hostBytes); bytes.HasPrefix(key, hostBytes); key, value = c.Next() {
wg.Add(1)
unmarshalAndSendCred(value, mdch, errch, &wg)
} }
}(value)
return nil
})
if err != nil {
return err
} }
wg.Wait() wg.Wait()
@ -76,10 +67,24 @@ func (db BoltDB) GetAllMetadata(ctx context.Context, sourceHost string, errch ch
return mdch return mdch
} }
func unmarshalAndSendCred(value []byte, mdch chan<- types.Metadata, errch chan<- error, wg *sync.WaitGroup) {
defer wg.Done()
var cred types.Credential
err := gobUnmarshal(value, &cred)
if err != nil {
errch <- err
return
}
mdch <- cred.Metadata
}
func (db BoltDB) Get(ctx context.Context, id string) (output types.Credential, err error) { func (db BoltDB) Get(ctx context.Context, id string) (output types.Credential, err error) {
err = db.bolt.View(func(tx *bbolt.Tx) error { err = db.bolt.View(func(tx *bbolt.Tx) error {
bkt := tx.Bucket([]byte(credentialsBkt)) bkt := getCredentialsBucket(tx)
if bkt == nil { if bkt.isEmpty {
return nil return nil
} }
@ -95,37 +100,98 @@ func (db BoltDB) Get(ctx context.Context, id string) (output types.Credential, e
} }
func (db BoltDB) Put(ctx context.Context, c types.Credential) (err error) { func (db BoltDB) Put(ctx context.Context, c types.Credential) (err error) {
err = db.bolt.Update(func(tx *bbolt.Tx) error { return db.bolt.Update(func(tx *bbolt.Tx) error {
bkt, err := tx.CreateBucketIfNotExists([]byte(credentialsBkt)) bkt := getCredentialsBucket(tx)
if err != nil { bkt.createIfNotExists()
value := bkt.Get([]byte(c.ID))
if value != nil {
var cred types.Credential
if err = gobUnmarshal(value, &cred); err != nil {
return err return err
} }
if err = bkt.Delete([]byte(c.ID)); err != nil {
return err
}
if err = bkt.hostPrimaryIndex.Delete([]byte(genHostPrimaryIdxKey(cred))); err != nil {
return err
}
}
value, err := gobMarshal(c) value, err := gobMarshal(c)
if err != nil { if err != nil {
return err return err
} }
if err = bkt.hostPrimaryIndex.Put([]byte(genHostPrimaryIdxKey(c)), value); err != nil {
return err
}
return bkt.Put([]byte(c.ID), value) return bkt.Put([]byte(c.ID), value)
}) })
return err
} }
func (db BoltDB) Delete(ctx context.Context, id string) (err error) { func (db BoltDB) Delete(ctx context.Context, id string) (err error) {
err = db.bolt.Update(func(tx *bbolt.Tx) error { return db.bolt.Update(func(tx *bbolt.Tx) error {
bkt := tx.Bucket([]byte(credentialsBkt)) bkt := getCredentialsBucket(tx)
if bkt == nil { if bkt.isEmpty {
return nil return nil
} }
value := bkt.Get([]byte(id))
if value == nil {
return nil
}
var cred types.Credential
if err = gobUnmarshal(value, &cred); err != nil {
return err
}
if err = bkt.hostPrimaryIndex.Delete([]byte(genHostPrimaryIdxKey(cred))); err != nil {
return err
}
return bkt.Delete([]byte(id)) return bkt.Delete([]byte(id))
}) })
return err
} }
const credentialsBkt = "credentials" const keyCredentialsBkt = "credentials"
const keyHostAndPrimaryIdx = "sourceHost-primary"
func getCredentialsBucket(tx *bbolt.Tx) credentialsBucket {
bkt := credentialsBucket{
Bucket: tx.Bucket([]byte(keyCredentialsBkt)),
tx: tx,
}
bkt.isEmpty = bkt.Bucket == nil
if !bkt.isEmpty {
bkt.hostPrimaryIndex = bkt.Bucket.Bucket([]byte(keyHostAndPrimaryIdx))
}
return bkt
}
type credentialsBucket struct {
*bbolt.Bucket
tx *bbolt.Tx
hostPrimaryIndex *bbolt.Bucket
isEmpty bool
}
func (bkt *credentialsBucket) createIfNotExists() {
if bkt.isEmpty {
bkt.Bucket, _ = bkt.tx.CreateBucket([]byte(keyCredentialsBkt))
bkt.hostPrimaryIndex, _ = bkt.CreateBucket([]byte(keyHostAndPrimaryIdx))
bkt.isEmpty = false
}
}
func genHostPrimaryIdxKey(cred types.Credential) string {
return fmt.Sprintf("%s-%s-%s", cred.SourceHost, cred.Primary, cred.ID)
}
func gobMarshal(v interface{}) (bs []byte, err error) { func gobMarshal(v interface{}) (bs []byte, err error) {
buf := bytes.NewBuffer(nil) buf := bytes.NewBuffer(nil)

View File

@ -5,6 +5,7 @@ import (
"strings" "strings"
"github.com/go-kit/kit/log" "github.com/go-kit/kit/log"
"github.com/go-kit/kit/transport"
"github.com/go-kit/kit/transport/grpc" "github.com/go-kit/kit/transport/grpc"
"google.golang.org/grpc/codes" "google.golang.org/grpc/codes"
"google.golang.org/grpc/status" "google.golang.org/grpc/status"
@ -20,31 +21,31 @@ func NewGRPCServer(svc types.Service, logger log.Logger) GRPCServer {
endpoints.MakeGetAllMetadataEndpoint(svc), endpoints.MakeGetAllMetadataEndpoint(svc),
decodeSourceHostRequest, decodeSourceHostRequest,
encodeMetadataStreamResponse, encodeMetadataStreamResponse,
grpc.ServerErrorLogger(logger), grpc.ServerErrorHandler(transport.NewLogErrorHandler(logger)),
), ),
get: grpc.NewServer( get: grpc.NewServer(
endpoints.MakeGetEndpoint(svc), endpoints.MakeGetEndpoint(svc),
decodeIdRequest, decodeIdRequest,
encodeCredentialResponse, encodeCredentialResponse,
grpc.ServerErrorLogger(logger), grpc.ServerErrorHandler(transport.NewLogErrorHandler(logger)),
), ),
create: grpc.NewServer( create: grpc.NewServer(
endpoints.MakeCreateEndpoint(svc), endpoints.MakeCreateEndpoint(svc),
decodeCredentialRequest, decodeCredentialRequest,
encodeCredentialResponse, encodeCredentialResponse,
grpc.ServerErrorLogger(logger), grpc.ServerErrorHandler(transport.NewLogErrorHandler(logger)),
), ),
update: grpc.NewServer( update: grpc.NewServer(
endpoints.MakeUpdateEndpoint(svc), endpoints.MakeUpdateEndpoint(svc),
decodeUpdateRequest, decodeUpdateRequest,
encodeCredentialResponse, encodeCredentialResponse,
grpc.ServerErrorLogger(logger), grpc.ServerErrorHandler(transport.NewLogErrorHandler(logger)),
), ),
delete: grpc.NewServer( delete: grpc.NewServer(
endpoints.MakeDeleteEndpoint(svc), endpoints.MakeDeleteEndpoint(svc),
decodeIdRequest, decodeIdRequest,
noOp, noOp,
grpc.ServerErrorLogger(logger), grpc.ServerErrorHandler(transport.NewLogErrorHandler(logger)),
), ),
} }
} }
@ -58,13 +59,12 @@ type GRPCServer struct {
} }
func (s GRPCServer) GetAllMetadata(r *protobuf.SourceHostRequest, srv protobuf.Credentials_GetAllMetadataServer) (err error) { func (s GRPCServer) GetAllMetadata(r *protobuf.SourceHostRequest, srv protobuf.Credentials_GetAllMetadataServer) (err error) {
defer func() { err = handlerGRPCError(err) }()
var i interface{} var i interface{}
ctx := srv.Context() ctx := srv.Context()
ctx, i, err = s.getAllMetadata.ServeGRPC(ctx, *r) ctx, i, err = s.getAllMetadata.ServeGRPC(ctx, *r)
if err != nil { if err != nil {
err = handlerGRPCError(err)
return err return err
} }
@ -74,14 +74,19 @@ receiveLoop:
for { for {
select { select {
case <-ctx.Done(): case <-ctx.Done():
err = ctx.Err()
break receiveLoop break receiveLoop
case err = <-mds.Errors: case err = <-mds.Errors:
if err != nil {
err = handlerGRPCError(err)
break receiveLoop break receiveLoop
}
case md, ok := <-mds.Metadata: case md, ok := <-mds.Metadata:
if !ok { if !ok {
break receiveLoop break receiveLoop
} }
if err = srv.Send(&md); err != nil { if err = srv.Send(&md); err != nil {
err = handlerGRPCError(err)
break receiveLoop break receiveLoop
} }
} }

View File

@ -18,7 +18,7 @@ const keyCredentials = "credentials"
func main() { func main() {
redisHost := pflag.StringP("redis-host", "r", "127.0.0.1:6379", "specify the redis host") redisHost := pflag.StringP("redis-host", "r", "127.0.0.1:6379", "specify the redis host")
boltFile := pflag.StringP("bolt-file", "b", "./data/bolt.db", "specify the bolt DB file") boltFile := pflag.StringP("bolt-file", "f", "./data/bolt.db", "specify the bolt DB file")
help := pflag.BoolP("help", "h", false, "see help") help := pflag.BoolP("help", "h", false, "see help")
pflag.Parse() pflag.Parse()

View File

@ -0,0 +1,110 @@
package main
import (
"bytes"
"encoding/gob"
"errors"
"fmt"
"sync"
"github.com/spf13/pflag"
"go.etcd.io/bbolt"
"github.com/mitchell/selfpass/services/credentials/types"
"github.com/mitchell/selfpass/services/migrations/migration"
)
const keyCredentialsBkt = "credentials"
const keyHostAndPrimaryIdx = "sourceHost-primary"
func main() {
file := pflag.StringP("file", "f", "./data/bolt.db", "specify the bolt db file")
help := pflag.BoolP("help", "h", false, "see help")
pflag.Parse()
if *help {
pflag.PrintDefaults()
return
}
db, err := bbolt.Open(*file, 0600, nil)
migration.Check(err)
fmt.Println("Beginning migration...")
creds := make(chan types.Credential, 1)
errs := make(chan error, 1)
go func() {
defer close(creds)
var wg sync.WaitGroup
errs <- db.View(func(tx *bbolt.Tx) error {
bkt := tx.Bucket([]byte(keyCredentialsBkt))
if bkt == nil {
return errors.New("no credentials bucket")
}
return bkt.ForEach(func(_, value []byte) error {
wg.Add(1)
go func(value []byte) {
defer wg.Done()
reader := bytes.NewReader(value)
var cred types.Credential
errs <- gob.NewDecoder(reader).Decode(&cred)
creds <- cred
}(value)
return nil
})
})
wg.Wait()
}()
go func() {
defer close(errs)
var wg sync.WaitGroup
for cred := range creds {
key := fmt.Sprintf("%s-%s-%s", cred.SourceHost, cred.Primary, cred.ID)
fmt.Printf("Adding credential %s to index as %s.\n", cred.ID, key)
wg.Add(1)
go func(key string, cred types.Credential) {
defer wg.Done()
buf := bytes.NewBuffer(nil)
migration.Check(gob.NewEncoder(buf).Encode(cred))
value := buf.Bytes()
errs <- db.Batch(func(tx *bbolt.Tx) error {
credBkt := tx.Bucket([]byte(keyCredentialsBkt))
bkt, err := credBkt.CreateBucketIfNotExists([]byte(keyHostAndPrimaryIdx))
if err != nil {
return err
}
return bkt.Put([]byte(key), value)
})
}(key, cred)
}
wg.Wait()
}()
for err = range errs {
migration.Check(err)
}
fmt.Println("Migration done.")
}