mirror of
https://github.com/spf13/viper
synced 2025-05-06 12:17:18 +00:00
add watch-remote functionality which is using channels (async)
possible risk v.keystore is updated async (currenlt no mutex are used)
This commit is contained in:
parent
d68d0a5b46
commit
5f2abee1c5
2 changed files with 50 additions and 0 deletions
|
@ -40,6 +40,29 @@ func (rc remoteConfigProvider) Watch(rp viper.RemoteProvider) (io.Reader, error)
|
|||
|
||||
return bytes.NewReader(resp), nil
|
||||
}
|
||||
func (rc remoteConfigProvider) WatchChannel(rp viper.RemoteProvider) (<-chan *viper.RemoteResponse, chan bool) {
|
||||
cm, err := getConfigManager(rp)
|
||||
if err != nil {
|
||||
return nil, nil
|
||||
}
|
||||
quit := make(chan bool)
|
||||
respChan := make(chan *viper.RemoteResponse, 0)
|
||||
watchchan := cm.Watch(rp.Path(), quit)
|
||||
// Todo Add quit channel
|
||||
go func(w <-chan *crypt.Response) {
|
||||
for {
|
||||
resp := <-w
|
||||
respChan <- &viper.RemoteResponse{
|
||||
Error: resp.Error,
|
||||
Value: resp.Value,
|
||||
}
|
||||
}
|
||||
|
||||
}(watchchan)
|
||||
return respChan ,quit
|
||||
|
||||
}
|
||||
|
||||
|
||||
func getConfigManager(rp viper.RemoteProvider) (crypt.ConfigManager, error) {
|
||||
|
||||
|
|
27
viper.go
27
viper.go
|
@ -38,6 +38,11 @@ import (
|
|||
"github.com/spf13/pflag"
|
||||
)
|
||||
|
||||
// Response represents a response from a backend store.
|
||||
type RemoteResponse struct {
|
||||
Value []byte
|
||||
Error error
|
||||
}
|
||||
var v *Viper
|
||||
|
||||
func init() {
|
||||
|
@ -47,6 +52,7 @@ func init() {
|
|||
type remoteConfigFactory interface {
|
||||
Get(rp RemoteProvider) (io.Reader, error)
|
||||
Watch(rp RemoteProvider) (io.Reader, error)
|
||||
WatchChannel(rp RemoteProvider)(<-chan *RemoteResponse, chan bool)
|
||||
}
|
||||
|
||||
// RemoteConfig is optional, see the remote package
|
||||
|
@ -1255,6 +1261,10 @@ func (v *Viper) WatchRemoteConfig() error {
|
|||
return v.watchKeyValueConfig()
|
||||
}
|
||||
|
||||
func (v *Viper) WatchRemoteConfigOnChannel() error {
|
||||
return v.watchKeyValueConfigOnChannel()
|
||||
}
|
||||
|
||||
// Unmarshall a Reader into a map.
|
||||
// Should probably be an unexported function.
|
||||
func unmarshalReader(in io.Reader, c map[string]interface{}) error {
|
||||
|
@ -1298,6 +1308,23 @@ func (v *Viper) getRemoteConfig(provider RemoteProvider) (map[string]interface{}
|
|||
return v.kvstore, err
|
||||
}
|
||||
|
||||
// Retrieve the first found remote configuration.
|
||||
func (v *Viper) watchKeyValueConfigOnChannel() error {
|
||||
for _, rp := range v.remoteProviders {
|
||||
respc, _ := RemoteConfig.WatchChannel(rp)
|
||||
//Todo: Add quit channel
|
||||
go func(rc <-chan *RemoteResponse) {
|
||||
for {
|
||||
b := <-rc
|
||||
reader := bytes.NewReader(b.Value)
|
||||
v.unmarshalReader(reader, v.kvstore)
|
||||
}
|
||||
}(respc)
|
||||
return nil
|
||||
}
|
||||
return RemoteConfigError("No Files Found")
|
||||
}
|
||||
|
||||
// Retrieve the first found remote configuration.
|
||||
func (v *Viper) watchKeyValueConfig() error {
|
||||
for _, rp := range v.remoteProviders {
|
||||
|
|
Loading…
Add table
Reference in a new issue