// Copyright 2019 The Gitea Authors. All rights reserved.
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file.

package setting

import (
	"path/filepath"
	"strconv"
	"time"

	"code.gitea.io/gitea/modules/log"

	ini "gopkg.in/ini.v1"
)

// QueueSettings represent the settings for a queue from the ini
type QueueSettings struct {
	Name             string
	DataDir          string
	QueueLength      int `ini:"LENGTH"`
	BatchLength      int
	ConnectionString string
	Type             string
	QueueName        string
	SetName          string
	WrapIfNecessary  bool
	MaxAttempts      int
	Timeout          time.Duration
	Workers          int
	MaxWorkers       int
	BlockTimeout     time.Duration
	BoostTimeout     time.Duration
	BoostWorkers     int
}

// Queue settings
var Queue = QueueSettings{}

// GetQueueSettings returns the queue settings for the appropriately named queue
func GetQueueSettings(name string) QueueSettings {
	q := QueueSettings{}
	sec := Cfg.Section("queue." + name)
	q.Name = name

	// DataDir is not directly inheritable
	q.DataDir = filepath.ToSlash(filepath.Join(Queue.DataDir, "common"))
	// QueueName is not directly inheritable either
	q.QueueName = name + Queue.QueueName
	for _, key := range sec.Keys() {
		switch key.Name() {
		case "DATADIR":
			q.DataDir = key.MustString(q.DataDir)
		case "QUEUE_NAME":
			q.QueueName = key.MustString(q.QueueName)
		case "SET_NAME":
			q.SetName = key.MustString(q.SetName)
		}
	}
	if len(q.SetName) == 0 && len(Queue.SetName) > 0 {
		q.SetName = q.QueueName + Queue.SetName
	}
	if !filepath.IsAbs(q.DataDir) {
		q.DataDir = filepath.ToSlash(filepath.Join(AppDataPath, q.DataDir))
	}
	_, _ = sec.NewKey("DATADIR", q.DataDir)

	// The rest are...
	q.QueueLength = sec.Key("LENGTH").MustInt(Queue.QueueLength)
	q.BatchLength = sec.Key("BATCH_LENGTH").MustInt(Queue.BatchLength)
	q.ConnectionString = sec.Key("CONN_STR").MustString(Queue.ConnectionString)
	q.Type = sec.Key("TYPE").MustString(Queue.Type)
	q.WrapIfNecessary = sec.Key("WRAP_IF_NECESSARY").MustBool(Queue.WrapIfNecessary)
	q.MaxAttempts = sec.Key("MAX_ATTEMPTS").MustInt(Queue.MaxAttempts)
	q.Timeout = sec.Key("TIMEOUT").MustDuration(Queue.Timeout)
	q.Workers = sec.Key("WORKERS").MustInt(Queue.Workers)
	q.MaxWorkers = sec.Key("MAX_WORKERS").MustInt(Queue.MaxWorkers)
	q.BlockTimeout = sec.Key("BLOCK_TIMEOUT").MustDuration(Queue.BlockTimeout)
	q.BoostTimeout = sec.Key("BOOST_TIMEOUT").MustDuration(Queue.BoostTimeout)
	q.BoostWorkers = sec.Key("BOOST_WORKERS").MustInt(Queue.BoostWorkers)

	return q
}

// NewQueueService sets up the default settings for Queues
// This is exported for tests to be able to use the queue
func NewQueueService() {
	sec := Cfg.Section("queue")
	Queue.DataDir = filepath.ToSlash(sec.Key("DATADIR").MustString("queues/"))
	if !filepath.IsAbs(Queue.DataDir) {
		Queue.DataDir = filepath.ToSlash(filepath.Join(AppDataPath, Queue.DataDir))
	}
	Queue.QueueLength = sec.Key("LENGTH").MustInt(20)
	Queue.BatchLength = sec.Key("BATCH_LENGTH").MustInt(20)
	Queue.ConnectionString = sec.Key("CONN_STR").MustString("")
	defaultType := sec.Key("TYPE").String()
	Queue.Type = sec.Key("TYPE").MustString("persistable-channel")
	Queue.WrapIfNecessary = sec.Key("WRAP_IF_NECESSARY").MustBool(true)
	Queue.MaxAttempts = sec.Key("MAX_ATTEMPTS").MustInt(10)
	Queue.Timeout = sec.Key("TIMEOUT").MustDuration(GracefulHammerTime + 30*time.Second)
	Queue.Workers = sec.Key("WORKERS").MustInt(0)
	Queue.MaxWorkers = sec.Key("MAX_WORKERS").MustInt(10)
	Queue.BlockTimeout = sec.Key("BLOCK_TIMEOUT").MustDuration(1 * time.Second)
	Queue.BoostTimeout = sec.Key("BOOST_TIMEOUT").MustDuration(5 * time.Minute)
	Queue.BoostWorkers = sec.Key("BOOST_WORKERS").MustInt(1)
	Queue.QueueName = sec.Key("QUEUE_NAME").MustString("_queue")
	Queue.SetName = sec.Key("SET_NAME").MustString("")

	// Now handle the old issue_indexer configuration
	section := Cfg.Section("queue.issue_indexer")
	directlySet := toDirectlySetKeysMap(section)
	if !directlySet["TYPE"] && defaultType == "" {
		switch Indexer.IssueQueueType {
		case LevelQueueType:
			_, _ = section.NewKey("TYPE", "level")
		case ChannelQueueType:
			_, _ = section.NewKey("TYPE", "persistable-channel")
		case RedisQueueType:
			_, _ = section.NewKey("TYPE", "redis")
		case "":
			_, _ = section.NewKey("TYPE", "level")
		default:
			log.Fatal("Unsupported indexer queue type: %v",
				Indexer.IssueQueueType)
		}
	}
	if !directlySet["LENGTH"] && Indexer.UpdateQueueLength != 0 {
		_, _ = section.NewKey("LENGTH", strconv.Itoa(Indexer.UpdateQueueLength))
	}
	if !directlySet["BATCH_LENGTH"] && Indexer.IssueQueueBatchNumber != 0 {
		_, _ = section.NewKey("BATCH_LENGTH", strconv.Itoa(Indexer.IssueQueueBatchNumber))
	}
	if !directlySet["DATADIR"] && Indexer.IssueQueueDir != "" {
		_, _ = section.NewKey("DATADIR", Indexer.IssueQueueDir)
	}
	if !directlySet["CONN_STR"] && Indexer.IssueQueueConnStr != "" {
		_, _ = section.NewKey("CONN_STR", Indexer.IssueQueueConnStr)
	}

	// Handle the old mailer configuration
	handleOldLengthConfiguration("mailer", Cfg.Section("mailer").Key("SEND_BUFFER_LEN").MustInt(100))

	// Handle the old test pull requests configuration
	// Please note this will be a unique queue
	handleOldLengthConfiguration("pr_patch_checker", Cfg.Section("repository").Key("PULL_REQUEST_QUEUE_LENGTH").MustInt(1000))

	// Handle the old mirror queue configuration
	// Please note this will be a unique queue
	handleOldLengthConfiguration("mirror", Cfg.Section("repository").Key("MIRROR_QUEUE_LENGTH").MustInt(1000))
}

// handleOldLengthConfiguration allows fallback to older configuration. `[queue.name]` `LENGTH` will override this configuration, but
// if that is left unset then we should fallback to the older configuration. (Except where the new length woul be <=0)
func handleOldLengthConfiguration(queueName string, value int) {
	// Don't override with 0
	if value <= 0 {
		return
	}

	section := Cfg.Section("queue." + queueName)
	directlySet := toDirectlySetKeysMap(section)
	if !directlySet["LENGTH"] {
		_, _ = section.NewKey("LENGTH", strconv.Itoa(value))
	}
}

// toDirectlySetKeysMap returns a bool map of keys directly set by this section
// Note: we cannot use section.HasKey(...) as that will immediately set the Key if a parent section has the Key
// but this section does not.
func toDirectlySetKeysMap(section *ini.Section) map[string]bool {
	sectionMap := map[string]bool{}
	for _, key := range section.Keys() {
		sectionMap[key.Name()] = true
	}
	return sectionMap
}