// Copyright 2018 Granitic. All rights reserved.
// Use of this source code is governed by an Apache 2.0 license that can be found in the LICENSE file at the root of this project.

package hprose_go_nats

import (
	"github.com/hprose/hprose-golang/rpc"
	"github.com/nats-io/go-nats"
	"os"
	"os/signal"
	"strings"
	"sync"
	"syscall"
)

type NatsServer struct {
	rpc.BaseService
	conn        *nats.Conn
	queue       chan *nats.Msg
	signal      chan os.Signal
	uri         string
	opt         *NatsOption
	contextPool sync.Pool
	workerPool  *rpc.WorkerPool
}

func NewServer(opt *NatsOption) rpc.Server {
	server := &NatsServer{
		opt: opt,
		uri: strings.Join(opt.uri, ","),
	}
	server.contextPool.New = func() interface{} {
		return new(rpc.BaseServiceContext)
	}
	server.InitBaseService()
	return server
}

func (ns *NatsServer) worker() {
	for {
		msg, ok := <-ns.queue
		if !ok {
			break
		}
		if nil != msg {
			ns.workerPool.Go(func() {
				ns.handle(msg)
			})
		}
	}
}

func (ns *NatsServer) init() (err error) {
	if nil == ns.conn {
		if ns.conn, err = nats.Connect(ns.uri, ns.opt.options...); nil != err {
			return
		}
	}
	if nil == ns.workerPool {
		ns.workerPool = new(rpc.WorkerPool)
		ns.workerPool.Start()
	}
	if nil == ns.queue {
		ns.queue = make(chan *nats.Msg, ns.opt.queue)
	}
	if _, err = ns.conn.ChanQueueSubscribe(ns.opt.topic, ns.opt.group, ns.queue); nil != err {
		return
	}
	go ns.worker()
	if nil == ns.signal {
		ns.signal = make(chan os.Signal, 1)
		signal.Notify(ns.signal, syscall.SIGHUP, syscall.SIGQUIT, syscall.SIGTERM, syscall.SIGINT, syscall.SIGKILL)
	}
	return
}
func (ns *NatsServer) handle(msg *nats.Msg) {
	ctx := ns.contextPool.Get().(*rpc.BaseServiceContext)
	defer ns.contextPool.Put(ctx)

	ctx.InitServiceContext(ns)
	data := ns.BaseService.Handle(msg.Data, ctx)
	ns.conn.Publish(msg.Reply, data)
}

func (ns *NatsServer) URI() string {
	return ns.uri
}

func (ns *NatsServer) Handle() error {
	return nil
}

func (ns *NatsServer) Close() {
	if nil != ns.signal {
		signal.Stop(ns.signal)
		ns.signal = nil
	}
	if nil != ns.queue {
		close(ns.queue)
		ns.queue = nil
	}
	if nil != ns.conn {
		ns.conn.Close()
		ns.conn = nil
	}
	if nil != ns.workerPool {
		ns.workerPool.Stop()
		ns.workerPool = nil
	}
}

func (ns *NatsServer) Start() (err error) {
	if err = ns.init(); nil != err {
		return
	}
	<-ns.signal
	ns.Close()
	return nil
}

func (ns *NatsServer) Restart() {
	ns.signal <- syscall.SIGHUP
}

func (ns *NatsServer) Stop() {
	ns.signal <- syscall.SIGQUIT
}