/*
Copyright 2018 The Kubernetes Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

    http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package controller

import (
	"fmt"

	"time"

	. "github.com/onsi/ginkgo"
	. "github.com/onsi/gomega"
	"github.com/prometheus/client_golang/prometheus"
	dto "github.com/prometheus/client_model/go"
	"k8s.io/api/apps/v1"
	corev1 "k8s.io/api/core/v1"
	"k8s.io/apimachinery/pkg/types"
	"k8s.io/client-go/util/workqueue"
	"sigs.k8s.io/controller-runtime/pkg/cache"
	"sigs.k8s.io/controller-runtime/pkg/cache/informertest"
	"sigs.k8s.io/controller-runtime/pkg/controller/controllertest"
	"sigs.k8s.io/controller-runtime/pkg/handler"
	ctrlmetrics "sigs.k8s.io/controller-runtime/pkg/internal/controller/metrics"
	"sigs.k8s.io/controller-runtime/pkg/predicate"
	"sigs.k8s.io/controller-runtime/pkg/reconcile"
	"sigs.k8s.io/controller-runtime/pkg/reconcile/reconciletest"
	"sigs.k8s.io/controller-runtime/pkg/source"
)

var _ = Describe("controller", func() {
	var fakeReconcile *reconciletest.FakeReconcile
	var ctrl *Controller
	var queue *controllertest.Queue
	var informers *informertest.FakeInformers
	var stop chan struct{}
	var reconciled chan reconcile.Request
	var request = reconcile.Request{
		NamespacedName: types.NamespacedName{Namespace: "foo", Name: "bar"},
	}

	BeforeEach(func() {
		stop = make(chan struct{})
		reconciled = make(chan reconcile.Request)
		fakeReconcile = &reconciletest.FakeReconcile{
			Chan: reconciled,
		}
		queue = &controllertest.Queue{
			Interface: workqueue.New(),
		}
		informers = &informertest.FakeInformers{}
		ctrl = &Controller{
			MaxConcurrentReconciles: 1,
			Do:    fakeReconcile,
			Queue: queue,
			Cache: informers,
		}
		ctrl.InjectFunc(func(interface{}) error { return nil })
	})

	AfterEach(func() {
		close(stop)
	})

	Describe("Reconciler", func() {
		It("should call the Reconciler function", func() {
			ctrl.Do = reconcile.Func(func(reconcile.Request) (reconcile.Result, error) {
				return reconcile.Result{Requeue: true}, nil
			})
			result, err := ctrl.Reconcile(
				reconcile.Request{NamespacedName: types.NamespacedName{Namespace: "foo", Name: "bar"}})
			Expect(err).NotTo(HaveOccurred())
			Expect(result).To(Equal(reconcile.Result{Requeue: true}))
		})
	})

	Describe("Start", func() {
		It("should return an error if there is an error waiting for the informers", func(done Done) {
			ctrl.WaitForCacheSync = func(<-chan struct{}) bool { return false }
			ctrl.Name = "foo"
			err := ctrl.Start(stop)
			Expect(err).To(HaveOccurred())
			Expect(err.Error()).To(ContainSubstring("failed to wait for foo caches to sync"))

			close(done)
		})

		It("should wait for each informer to sync", func(done Done) {
			// Use a stopped channel so Start doesn't block
			stopped := make(chan struct{})
			close(stopped)

			c, err := cache.New(cfg, cache.Options{})
			Expect(err).NotTo(HaveOccurred())
			c.GetInformer(&v1.Deployment{})
			c.GetInformer(&v1.ReplicaSet{})
			ctrl.Cache = c
			ctrl.WaitForCacheSync = func(<-chan struct{}) bool { return true }

			Expect(ctrl.Start(stopped)).NotTo(HaveOccurred())

			close(done)
		})
	})

	Describe("Watch", func() {
		It("should inject dependencies into the Source", func() {
			src := &source.Kind{Type: &corev1.Pod{}}
			src.InjectCache(ctrl.Cache)
			evthdl := &handler.EnqueueRequestForObject{}
			found := false
			ctrl.SetFields = func(i interface{}) error {
				defer GinkgoRecover()
				if i == src {
					found = true
				}
				return nil
			}
			Expect(ctrl.Watch(src, evthdl)).NotTo(HaveOccurred())
			Expect(found).To(BeTrue(), "Source not injected")
		})

		It("should return an error if there is an error injecting into the Source", func() {
			src := &source.Kind{Type: &corev1.Pod{}}
			src.InjectCache(ctrl.Cache)
			evthdl := &handler.EnqueueRequestForObject{}
			expected := fmt.Errorf("expect fail source")
			ctrl.SetFields = func(i interface{}) error {
				defer GinkgoRecover()
				if i == src {
					return expected
				}
				return nil
			}
			Expect(ctrl.Watch(src, evthdl)).To(Equal(expected))
		})

		It("should inject dependencies into the EventHandler", func() {
			src := &source.Kind{Type: &corev1.Pod{}}
			src.InjectCache(ctrl.Cache)
			evthdl := &handler.EnqueueRequestForObject{}
			found := false
			ctrl.SetFields = func(i interface{}) error {
				defer GinkgoRecover()
				if i == evthdl {
					found = true
				}
				return nil
			}
			Expect(ctrl.Watch(src, evthdl)).NotTo(HaveOccurred())
			Expect(found).To(BeTrue(), "EventHandler not injected")
		})

		It("should return an error if there is an error injecting into the EventHandler", func() {
			src := &source.Kind{Type: &corev1.Pod{}}
			evthdl := &handler.EnqueueRequestForObject{}
			expected := fmt.Errorf("expect fail eventhandler")
			ctrl.SetFields = func(i interface{}) error {
				defer GinkgoRecover()
				if i == evthdl {
					return expected
				}
				return nil
			}
			Expect(ctrl.Watch(src, evthdl)).To(Equal(expected))
		})

		It("should inject dependencies into the Reconciler", func() {
			// TODO(community): Write this
		})

		It("should return an error if there is an error injecting into the Reconciler", func() {
			// TODO(community): Write this
		})

		It("should inject dependencies into all of the Predicates", func() {
			src := &source.Kind{Type: &corev1.Pod{}}
			src.InjectCache(ctrl.Cache)
			evthdl := &handler.EnqueueRequestForObject{}
			pr1 := &predicate.Funcs{}
			pr2 := &predicate.Funcs{}
			found1 := false
			found2 := false
			ctrl.SetFields = func(i interface{}) error {
				defer GinkgoRecover()
				if i == pr1 {
					found1 = true
				}
				if i == pr2 {
					found2 = true
				}
				return nil
			}
			Expect(ctrl.Watch(src, evthdl, pr1, pr2)).NotTo(HaveOccurred())
			Expect(found1).To(BeTrue(), "First Predicated not injected")
			Expect(found2).To(BeTrue(), "Second Predicated not injected")
		})

		It("should return an error if there is an error injecting into any of the Predicates", func() {
			src := &source.Kind{Type: &corev1.Pod{}}
			src.InjectCache(ctrl.Cache)
			evthdl := &handler.EnqueueRequestForObject{}
			pr1 := &predicate.Funcs{}
			pr2 := &predicate.Funcs{}
			expected := fmt.Errorf("expect fail predicate")
			ctrl.SetFields = func(i interface{}) error {
				defer GinkgoRecover()
				if i == pr1 {
					return expected
				}
				return nil
			}
			Expect(ctrl.Watch(src, evthdl, pr1, pr2)).To(Equal(expected))

			ctrl.SetFields = func(i interface{}) error {
				defer GinkgoRecover()
				if i == pr2 {
					return expected
				}
				return nil
			}
			Expect(ctrl.Watch(src, evthdl, pr1, pr2)).To(Equal(expected))
		})

		It("should call Start the Source with the EventHandler, Queue, and Predicates", func() {
			pr1 := &predicate.Funcs{}
			pr2 := &predicate.Funcs{}
			evthdl := &handler.EnqueueRequestForObject{}
			src := source.Func(func(e handler.EventHandler, q workqueue.RateLimitingInterface, p ...predicate.Predicate) error {
				defer GinkgoRecover()
				Expect(e).To(Equal(evthdl))
				Expect(q).To(Equal(ctrl.Queue))
				Expect(p).To(ConsistOf(pr1, pr2))
				return nil
			})
			Expect(ctrl.Watch(src, evthdl, pr1, pr2)).NotTo(HaveOccurred())

		})

		It("should return an error if there is an error starting the Source", func() {
			err := fmt.Errorf("Expected Error: could not start source")
			src := source.Func(func(handler.EventHandler,
				workqueue.RateLimitingInterface,
				...predicate.Predicate) error {
				defer GinkgoRecover()
				return err
			})
			Expect(ctrl.Watch(src, &handler.EnqueueRequestForObject{})).To(Equal(err))
		})
	})

	Describe("Processing queue items from a Controller", func() {
		It("should call Reconciler if an item is enqueued", func(done Done) {
			go func() {
				defer GinkgoRecover()
				Expect(ctrl.Start(stop)).NotTo(HaveOccurred())
			}()
			ctrl.Queue.Add(request)

			By("Invoking Reconciler")
			Expect(<-reconciled).To(Equal(request))

			By("Removing the item from the queue")
			Eventually(ctrl.Queue.Len).Should(Equal(0))
			Eventually(func() int { return ctrl.Queue.NumRequeues(request) }).Should(Equal(0))

			close(done)
		})

		It("should continue to process additional queue items after the first", func(done Done) {
			ctrl.Do = reconcile.Func(func(reconcile.Request) (reconcile.Result, error) {
				defer GinkgoRecover()
				Fail("Reconciler should not have been called")
				return reconcile.Result{}, nil
			})
			go func() {
				defer GinkgoRecover()
				Expect(ctrl.Start(stop)).NotTo(HaveOccurred())
			}()
			ctrl.Queue.Add("foo/bar")

			// Don't expect the string to reconciled
			Expect(ctrl.processNextWorkItem()).To(BeTrue())

			Eventually(ctrl.Queue.Len).Should(Equal(0))
			Eventually(func() int { return ctrl.Queue.NumRequeues(request) }).Should(Equal(0))

			close(done)
		})

		It("should forget an item if it is not a Request and continue processing items", func() {
			// TODO(community): write this test
		})

		It("should requeue a Request if there is an error and continue processing items", func(done Done) {
			fakeReconcile.Err = fmt.Errorf("expected error: reconcile")
			go func() {
				defer GinkgoRecover()
				Expect(ctrl.Start(stop)).NotTo(HaveOccurred())
			}()
			ctrl.Queue.Add(request)

			// Reduce the jitterperiod so we don't have to wait a second before the reconcile function is rerun.
			ctrl.JitterPeriod = time.Millisecond

			By("Invoking Reconciler which will give an error")
			Expect(<-reconciled).To(Equal(request))

			By("Invoking Reconciler a second time without error")
			fakeReconcile.Err = nil
			Expect(<-reconciled).To(Equal(request))

			By("Removing the item from the queue")
			Eventually(ctrl.Queue.Len).Should(Equal(0))
			Eventually(func() int { return ctrl.Queue.NumRequeues(request) }).Should(Equal(0))

			close(done)
		}, 1.0)

		It("should requeue a Request if the Result sets Requeue:true and continue processing items", func() {
			fakeReconcile.Result.Requeue = true
			go func() {
				defer GinkgoRecover()
				Expect(ctrl.Start(stop)).NotTo(HaveOccurred())
			}()
			dq := &DelegatingQueue{RateLimitingInterface: ctrl.Queue}
			ctrl.Queue = dq
			ctrl.Queue.Add(request)
			Expect(dq.countAdd).To(Equal(1))
			Expect(dq.countAddAfter).To(Equal(0))
			Expect(dq.countAddRateLimited).To(Equal(0))

			By("Invoking Reconciler which will ask for requeue")
			Expect(<-reconciled).To(Equal(request))
			Expect(dq.countAdd).To(Equal(1))
			Expect(dq.countAddAfter).To(Equal(0))
			Expect(dq.countAddRateLimited).To(Equal(1))

			By("Invoking Reconciler a second time without asking for requeue")
			fakeReconcile.Result.Requeue = false
			Expect(<-reconciled).To(Equal(request))
			Expect(dq.countAdd).To(Equal(1))
			Expect(dq.countAddAfter).To(Equal(0))
			Expect(dq.countAddRateLimited).To(Equal(1))

			By("Removing the item from the queue")
			Eventually(ctrl.Queue.Len).Should(Equal(0))
			Eventually(func() int { return ctrl.Queue.NumRequeues(request) }).Should(Equal(0))
		})

		It("should requeue a Request after a duration if the Result sets Requeue:true and "+
			"RequeueAfter is set", func() {
			fakeReconcile.Result.RequeueAfter = time.Millisecond * 100
			go func() {
				defer GinkgoRecover()
				Expect(ctrl.Start(stop)).NotTo(HaveOccurred())
			}()
			dq := &DelegatingQueue{RateLimitingInterface: ctrl.Queue}
			ctrl.Queue = dq
			ctrl.Queue.Add(request)
			Expect(dq.countAdd).To(Equal(1))
			Expect(dq.countAddAfter).To(Equal(0))
			Expect(dq.countAddRateLimited).To(Equal(0))

			By("Invoking Reconciler which will ask for requeue")
			Expect(<-reconciled).To(Equal(request))
			Expect(dq.countAdd).To(Equal(1))
			Expect(dq.countAddAfter).To(Equal(1))
			Expect(dq.countAddRateLimited).To(Equal(0))

			By("Invoking Reconciler a second time without asking for requeue")
			fakeReconcile.Result.Requeue = false
			Expect(<-reconciled).To(Equal(request))
			Expect(dq.countAdd).To(Equal(1))
			Expect(dq.countAddAfter).To(Equal(1))
			Expect(dq.countAddRateLimited).To(Equal(0))

			By("Removing the item from the queue")
			Eventually(ctrl.Queue.Len).Should(Equal(0))
			Eventually(func() int { return ctrl.Queue.NumRequeues(request) }).Should(Equal(0))
		})

		It("should forget the Request if Reconciler is successful", func() {
			// TODO(community): write this test
		})

		It("should return if the queue is shutdown", func() {
			// TODO(community): write this test
		})

		It("should wait for informers to be synced before processing items", func() {
			// TODO(community): write this test
		})

		It("should create a new go routine for MaxConcurrentReconciles", func() {
			// TODO(community): write this test
		})

		Context("should update prometheus metrics", func() {
			It("should requeue a Request if there is an error and continue processing items", func(done Done) {
				var queueLength, reconcileErrs dto.Metric
				ctrlmetrics.QueueLength.Reset()
				Expect(func() error {
					ctrlmetrics.QueueLength.WithLabelValues(ctrl.Name).Write(&queueLength)
					if queueLength.GetGauge().GetValue() != 0.0 {
						return fmt.Errorf("metrics not reset")
					}
					return nil
				}()).Should(Succeed())

				ctrlmetrics.ReconcileErrors.Reset()
				Expect(func() error {
					ctrlmetrics.ReconcileErrors.WithLabelValues(ctrl.Name).Write(&reconcileErrs)
					if reconcileErrs.GetCounter().GetValue() != 0.0 {
						return fmt.Errorf("metrics not reset")
					}
					return nil
				}()).Should(Succeed())

				fakeReconcile.Err = fmt.Errorf("expected error: reconcile")
				go func() {
					defer GinkgoRecover()
					Expect(ctrl.Start(stop)).NotTo(HaveOccurred())
				}()
				ctrl.Queue.Add(request)

				// Reduce the jitterperiod so we don't have to wait a second before the reconcile function is rerun.
				ctrl.JitterPeriod = time.Millisecond

				By("Invoking Reconciler which will give an error")
				Expect(<-reconciled).To(Equal(request))
				Eventually(func() error {
					ctrlmetrics.QueueLength.WithLabelValues(ctrl.Name).Write(&queueLength)
					if queueLength.GetGauge().GetValue() != 1.0 {
						return fmt.Errorf("metrics not updated")
					}
					return nil
				}, 2.0).Should(Succeed())
				Eventually(func() error {
					ctrlmetrics.ReconcileErrors.WithLabelValues(ctrl.Name).Write(&reconcileErrs)
					if reconcileErrs.GetCounter().GetValue() != 1.0 {
						return fmt.Errorf("metrics not updated")
					}
					return nil
				}, 2.0).Should(Succeed())

				By("Invoking Reconciler a second time without error")
				fakeReconcile.Err = nil
				Expect(<-reconciled).To(Equal(request))

				By("Removing the item from the queue")
				Eventually(ctrl.Queue.Len).Should(Equal(0))
				Eventually(func() int { return ctrl.Queue.NumRequeues(request) }).Should(Equal(0))

				close(done)
			}, 2.0)

			It("should add a reconcile time to the reconcile time histogram", func(done Done) {
				var reconcileTime dto.Metric
				ctrlmetrics.ReconcileTime.Reset()

				Expect(func() error {
					histObserver := ctrlmetrics.ReconcileTime.WithLabelValues(ctrl.Name)
					hist := histObserver.(prometheus.Histogram)
					hist.Write(&reconcileTime)
					if reconcileTime.GetHistogram().GetSampleCount() != uint64(0) {
						return fmt.Errorf("metrics not reset")
					}
					return nil
				}()).Should(Succeed())

				go func() {
					defer GinkgoRecover()
					Expect(ctrl.Start(stop)).NotTo(HaveOccurred())
				}()
				ctrl.Queue.Add(request)

				By("Invoking Reconciler")
				Expect(<-reconciled).To(Equal(request))

				By("Removing the item from the queue")
				Eventually(ctrl.Queue.Len).Should(Equal(0))
				Eventually(func() int { return ctrl.Queue.NumRequeues(request) }).Should(Equal(0))

				Eventually(func() error {
					histObserver := ctrlmetrics.ReconcileTime.WithLabelValues(ctrl.Name)
					hist := histObserver.(prometheus.Histogram)
					hist.Write(&reconcileTime)
					if reconcileTime.GetHistogram().GetSampleCount() == uint64(0) {
						return fmt.Errorf("metrics not updated")
					}
					return nil
				}, 2.0).Should(Succeed())

				close(done)
			}, 4.0)
		})
	})
})

type DelegatingQueue struct {
	workqueue.RateLimitingInterface

	countAddRateLimited int
	countAdd            int
	countAddAfter       int
}

func (q *DelegatingQueue) AddRateLimited(item interface{}) {
	q.countAddRateLimited++
	q.RateLimitingInterface.AddRateLimited(item)
}

func (q *DelegatingQueue) AddAfter(item interface{}, d time.Duration) {
	q.countAddAfter++
	q.RateLimitingInterface.AddAfter(item, d)
}

func (q *DelegatingQueue) Add(item interface{}) {
	q.countAdd++
	q.RateLimitingInterface.Add(item)
}