This commit is contained in:
Cavus700 2024-05-13 17:53:38 +07:00 committed by GitHub
commit 7a83cae275
No known key found for this signature in database
GPG key ID: B5690EEEBB952194

View file

@ -21,6 +21,7 @@ package viper
import (
"bytes"
"context"
"encoding/csv"
"errors"
"fmt"
@ -143,6 +144,16 @@ func DecodeHook(hook mapstructure.DecodeHookFunc) DecoderConfigOption {
}
}
type RemoteConfigEvent uint
const (
RemoteConfigEvent_Unknown RemoteConfigEvent = iota
// Remote config was updated
RemoteConfigEvent_Updated
// Remote config watch routine stopped
RemoteConfigEvent_Stopped
)
// Viper is a prioritized configuration registry. It
// maintains a set of configuration sources, fetches
// values to populate those, and provides them according
@ -217,7 +228,8 @@ type Viper struct {
aliases map[string]string
typeByDefValue bool
onConfigChange func(fsnotify.Event)
onConfigChange func(fsnotify.Event)
onRemoteConfigChange func(RemoteConfigEvent)
logger *slog.Logger
@ -432,6 +444,14 @@ func (v *Viper) OnConfigChange(run func(in fsnotify.Event)) {
v.onConfigChange = run
}
// OnRemoteConfigChange sets the event handler that is called when a remote config file changes.
func OnRemoteConfigChange(run func(RemoteConfigEvent)) { v.OnRemoteConfigChange(run) }
// OnRemoteConfigChange sets the event handler that is called when a remote config file changes.
func (v *Viper) OnRemoteConfigChange(run func(RemoteConfigEvent)) {
v.onRemoteConfigChange = run
}
// WatchConfig starts watching a config file for changes.
func WatchConfig() { v.WatchConfig() }
@ -1967,7 +1987,11 @@ func (v *Viper) WatchRemoteConfig() error {
}
func (v *Viper) WatchRemoteConfigOnChannel() error {
return v.watchKeyValueConfigOnChannel()
return v.watchKeyValueConfigOnChannelWithContext(context.TODO())
}
func (v *Viper) WatchRemoteConfigOnChannelWithContext(ctx context.Context) error {
return v.watchKeyValueConfigOnChannelWithContext(ctx)
}
// Retrieve the first found remote configuration.
@ -2004,20 +2028,32 @@ func (v *Viper) getRemoteConfig(provider RemoteProvider) (map[string]any, error)
return v.kvstore, err
}
// Retrieve the first found remote configuration.
func (v *Viper) watchKeyValueConfigOnChannel() error {
// Watch the first found remote configuration.
func (v *Viper) watchKeyValueConfigOnChannelWithContext(ctx context.Context) error {
if len(v.remoteProviders) == 0 {
return RemoteConfigError("No Remote Providers")
}
for _, rp := range v.remoteProviders {
respc, _ := RemoteConfig.WatchChannel(rp)
// Todo: Add quit channel
go func(rc <-chan *RemoteResponse) {
for {
b := <-rc
reader := bytes.NewReader(b.Value)
v.unmarshalReader(reader, v.kvstore)
select {
case b := <-rc:
reader := bytes.NewReader(b.Value)
if err := v.unmarshalReader(reader, v.kvstore); err != nil {
v.logger.Error(fmt.Errorf("unmarshal remote config update: %w", err).Error())
continue
}
if v.onRemoteConfigChange != nil {
v.onRemoteConfigChange(RemoteConfigEvent_Updated)
}
case <-ctx.Done():
if v.onRemoteConfigChange != nil {
v.onRemoteConfigChange(RemoteConfigEvent_Stopped)
}
return
}
}
}(respc)
return nil