import { Component, NgZone, OnDestroy, OnInit } from '@angular/core'
import { TableWithFilters } from '../../shared/absracts/table-with-filters'
import { SpinnerService } from '../../services/spinner/spinner.service'
import { LoggerService } from '../../services/logger/logger.service'
import { MethodsService } from '../../services/methods/methods.service'
import { UserDataService } from '../../services/user-data/user-data.service'
import { GlobalNotification, GlobalState } from '../../app.state'
import { PiKafkaPanelService } from '../../services/api/services/pi-kafka-panel.service'
import { PiKafkaPanelProcessorService } from '../../services/api/processors/pi-kafka-panel-processor.service'

@Component({
  selector: '',
  templateUrl: './pi-kafka-panel.component.html',
  styleUrls: ['./pi-kafka-panel.component.scss']
})
export class PIKafkaPanelComponent<T extends KafkaPanelItem = KafkaPanelItem> extends TableWithFilters implements OnInit, OnDestroy {
  readonly title = 'PI Kafka Panel'
  readonly paginationName = 'Consumer Groups'

  ENVIRONMENT = ENV

  filterValue: string = ''

  topics: T[] = []
  filtered: T[] = []

  metadata: KafkaMetadata
  queryTimes: KafkaTopicMetricsQueryTimes = { db: 0, kafka: 0 }

  //filters
  allInstances: { id: string, text: string }[] = []
  availableInstances: { id: string, text: string }[] = []
  currentInstances: { id: string, text: string }[] = []

  allTopics: { id: string, text: string }[] = []
  availableTopics: { id: string, text: string }[] = []
  currentTopics: { id: string, text: string }[] = []

  allRegions: { id: string, text: string }[] = []
  availableRegions: { id: string, text: string }[] = []
  currentRegions: { id: string, text: string }[] = []

  allConsumerGroups: { id: string, text: string }[] = []
  availableConsumerGroups: { id: string, text: string }[] = []
  currentConsumerGroups: { id: string, text: string }[] = []

  availableGranularity: string[] = ['auto', 'min', 'hour', 'day']
  currentGranularity = this.availableGranularity[0]

  liveDataToggle: boolean = true
  showLastPoint: boolean = true
  filterInactiveConsumerGroups: boolean = true

  totalMessages: string
  averageLag: string
  totalConsumers: number
  totalPartitions: number

  //modals
  selectedTopic: T
  showDatePicker = true

  //table
  readonly tableHeaders: TableHeader[]
  currentSortBy: string

  //@Override
  readonly timeFilter = MethodsService.timeObject()
  currentTime = this.timeFilter[1]

  _isRefreshing: boolean = false

  textToClip = MethodsService.copyToClipboard
  integerWithCommas = MethodsService.integerWithCommas

  private _debounce: void | number
  isEligibleToPerformAnyAction: boolean = false
  isEligibleToResizePartitions: boolean = false
  isEligibleToAlterOffset: boolean = false
  isEligibleToDeleteConsumerGroup: boolean = false
  isEligibleToDeleteConsumerGroupData: boolean = false
  private debounce_timeout: number = 75

  constructor (
    private _userData: UserDataService,
    private _kafka: PiKafkaPanelService,
    private _zone: NgZone,
    private _state: GlobalState,
  ) {
    super()
    this.isEligibleToResizePartitions = this._userData.hasPermissions('/kafka_panel/resize_partitions', UserPermission.ADMIN)
    this.isEligibleToAlterOffset = this._userData.hasPermissions('/kafka_panel/consumer_group_ts_offset', UserPermission.ADMIN)
    this.isEligibleToDeleteConsumerGroup = this._userData.hasPermissions('/kafka_panel/delete_consumer_group', UserPermission.ADMIN)
    this.isEligibleToDeleteConsumerGroupData = this._userData.hasPermissions('/kafka_panel/delete_consumer_group_db_data', UserPermission.ADMIN)

    this.isEligibleToPerformAnyAction = this.isEligibleToResizePartitions || this.isEligibleToAlterOffset || this.isEligibleToDeleteConsumerGroup || this.isEligibleToDeleteConsumerGroupData

    this.currentItemsPerPage = this.itemsPerPageFilter[1]
    this.tableHeaders = [
      //@formatter:off
      // { /*width: '4', */  name: '',                         sortable: false,    field: ''           },
      { /*width: '12',*/  name: 'Last Sample',     sortable: true,  field: 'updatedAt'      },
      { /*width: '12',*/  name: 'Region',          sortable: true,  field: 'region'         },
      { /*width: '12',*/  name: 'Instance ID',     sortable: true,  field: 'clusterId'      },
      { /*width: '12',*/  name: 'Consumer Group',  sortable: true,  field: 'pimGroupId'     },
      // { /*width: '12',*/  name: 'Topic',           sortable: true,  field: 'pimTopic'       },
      { /*width: '12',*/  name: 'Consumer Lag',    sortable: true,  field: 'messagesBehind' },
      { /*width: '12',*/  name: '',                sortable: false, field: 'messagesBehind' },
      { /*width: '12',*/  name: 'Messages Count',  sortable: true,  field: 'totalMessages'  },
      { /*width: '12',*/  name: '',                sortable: false, field: 'totalMessages'  },
      { /*width: '12',*/  name: 'Consumers Count', sortable: true,  field: 'numOfConsumers' },
      { /*width: '12',*/  name: '',                sortable: false, field: 'numOfConsumers' },
      { /*width: '12',*/  name: 'Actions',         sortable: false,                         },
      //@formatter:on
    ].filter(x => !!x)

    this.currentSortBy = this.tableHeaders.find(f => f.field == 'messagesBehind')!.field
  }

  registerModalListeners ({ key }: KeyboardEvent) {
    if (!this.selectedTopic) {
      return
    }
    if (key === 'Escape') {
      this.clearSelections()
    }
  }

  ngOnDestroy (): void {
    window.removeEventListener('keyup', this.registerModalListeners.bind(this))
  }

  ngOnInit (): void {
    this.refreshTable!()
    this._state.subscribe(GlobalNotification.BACKGROUND_REFRESH, () => this.refreshTable(true))
  }

  sortTable () {
    this.filtered = [...this.topics]

    this.filtered.sort((a, b) => {
      let _a = a[this.currentSortBy]
      let _b = b[this.currentSortBy]

      const __a = parseInt(_a)
      const __b = !isNaN(__a) && parseInt(_b)

      if (!isNaN(__b) && typeof __b == 'number') {
        _a = __a
        _b = __b
      }

      if (typeof _a == 'number' || typeof _b == 'number') {
        if (typeof _a == 'number' && typeof _b == 'number') {
          return this.isSortReversed ?
            _a - _b :
            _b - _a
        }
        if (typeof _a == 'number') {
          return this.isSortReversed ? 1 : -1
        }
        if (typeof _b == 'number') {
          return this.isSortReversed ? -1 : 1
        }

      }

      if (_a instanceof Date || _b instanceof Date) {
        if (_a instanceof Date && _b instanceof Date) {
          return this.isSortReversed ?
            _a.getTime() - _b.getTime() :
            _b.getTime() - _a.getTime()
        }
        if (_a instanceof Date) {
          return this.isSortReversed ? 1 : -1
        }
        if (_b instanceof Date) {
          return this.isSortReversed ? -1 : 1
        }
      }

      return this.isSortReversed ?
        _a > _b ? 1 : _a < _b ? -1 : 0 :
        _a > _b ? -1 : _a < _b ? 1 : 0
    })

    if (this.filterValue) {
      this.filtered = this.filtered.filter(v =>
        v.topic.includes(this.filterValue) ||
        v.groupId.includes(this.filterValue) ||
        v.clusterId?.includes(this.filterValue)
      )
    }

    const instances = this.currentInstances.length ? this.currentInstances.map(i => i.id) : this.allInstances.map(i => i.id)
    const topics = this.currentTopics.length ? this.currentTopics.map(i => i.id) : this.allTopics.map(i => i.id)
    const consumerGroups = this.currentConsumerGroups.length ? this.currentConsumerGroups.map(i => i.id) : this.allConsumerGroups.map(i => i.id)
    const regions = this.currentRegions.length ? this.currentRegions.map(i => i.id) : this.allRegions.map(i => i.id)

    if (instances.length && instances.length != this.allInstances.length) {
      this.filtered = this.filtered.filter(v => instances.includes(v.clusterId))
    }

    if (topics.length && topics.length != this.allTopics.length) {
      this.filtered = this.filtered.filter(v => topics.includes(v.pimTopic))
    }

    if (consumerGroups.length && consumerGroups.length != this.allConsumerGroups.length) {
      this.filtered = this.filtered.filter(v => consumerGroups.includes(v.pimGroupId))
    }

    if (regions.length && regions.length != this.allRegions.length) {
      this.filtered = this.filtered.filter(v => regions.includes(v.region))
    }

    //update available instances / topics / consumers filters
    if (instances.length || topics.length || consumerGroups.length || regions.length) {
      this.availableInstances = this.allInstances.filter(i =>
        this.topics.find(f => i.id == f.clusterId && topics.includes(f.pimTopic) && consumerGroups.includes(f.pimGroupId) && regions.includes(f.region))
      )
      this.availableTopics = this.allTopics.filter(i =>
        this.topics.find(f => i.id == f.pimTopic && consumerGroups.includes(f.pimGroupId) && instances.includes(f.clusterId) && regions.includes(f.region))
      )
      this.availableConsumerGroups = this.allConsumerGroups.filter(i =>
        this.topics.find(f => i.id == f.pimGroupId && topics.includes(f.pimTopic) && instances.includes(f.clusterId) && regions.includes(f.region))
      )
      this.availableRegions = this.allRegions.filter(i =>
        this.topics.find(f => i.id == f.region && topics.includes(f.pimTopic) && instances.includes(f.clusterId) && consumerGroups.includes(f.pimGroupId))
      )
    } else {
      this.availableInstances = [...this.allInstances]
      this.availableTopics = [...this.allTopics]
      this.availableConsumerGroups = [...this.allConsumerGroups]
      this.availableRegions = [...this.allRegions]
    }

    this.numOfAvailableDocs = this.filtered.length

    //re calc after filtering
    this.averageLag = (
      (this.filtered.length ?
        (this.filtered.reduce((c, p) => c += p._lagPercent, 0) * 100 / this.filtered.length) : 0) / 100).toFixed(1) + '%'

    const seenTotalMessagesTopic = new Set<string>()
    this.totalMessages = MethodsService.numberToSuffix(
      this.filtered
        .filter(v => {
          if (seenTotalMessagesTopic.has(v.topic)) {
            return false
          }
          seenTotalMessagesTopic.add(v.topic)
          return true
        })
        .reduce((c, p) => c += p.totalMessages, 0)
    )
    this.totalConsumers = this.filtered.reduce((c, p) => c += p.numOfConsumers, 0)
    this.totalPartitions = this.filtered.reduce((c, p) => c += p.numOfPartitions, 0)

    //cut results
    this.filtered = this.filtered.slice((this.currentPage - 1) * this.currentItemsPerPage, this.currentItemsPerPage * this.currentPage)
  }

  onFilterChange (item: any, useSort = false) {
    if (this._debounce) {
      this._debounce = clearTimeout(this._debounce)
    }
    this._debounce = setTimeout(() => useSort ? this.sortTable() : this.refreshTable(), this.debounce_timeout) as any as number
  }

  confluentClusterOverview (): string {
    return `${this.metadata.baseUrl}/${this.metadata.confluentEnvId}/clusters/${this.metadata.confluentClusterId}/overview`
  }

  async resizePartitions (topic: T) {
    const prompt = await MethodsService.prompt(`Resize Topic Partitions`,
      `You are about to resize the partitions for topic <b class="text-nowrap">${topic.topic}</b><br><br><b>Notes:</b><br>&nbsp;&nbsp;- Partitions can be only scaled up.<br>&nbsp;&nbsp;- Any attached consumer group will be deleted.`,
      `${topic.numOfPartitions}`,
      { title: 'Input Error', message: `Must be a positive Integer larger than ${topic.numOfPartitions}` },
      'Rescale',
      (str: string) => /\d+/.test(str) && parseFloat(str) == parseInt(str) && parseInt(str) > topic.numOfPartitions)

    if (!prompt) {
      return
    }

    const body: KafkaResizeTopicRequestParams = {
      topic: topic.topic,
      numOfPartitions: Number(prompt.input)
    }

    try {
      SpinnerService.spin('mini')
      const response = await this._kafka.resizeTopicPartitions(body)
      if (!response.success) {
        MethodsService.toast(
          'error',
          'Failed resizing partitions',
          response.error || response.data
        )
      } else {
        MethodsService.toast(
          'success',
          `Partitions Added Successfully!`
        )
        await this.refreshTable()
      }
    } catch (e) {
      LoggerService.error(e)
    }
    SpinnerService.stop('mini')
  }

  async setConsumerGroupOffset (topic: T) {
    const prompt = await MethodsService.prompt(`Set Consumer Group Offset By Timestamp`,
      `Set the consumer group <b class="text-nowrap">${topic.groupId}</b> offsets for all the partitions to the desired date.  <br><br><b>Notes:</b><br>&nbsp;&nbsp;- Consumers of this group has to be shut before resetting the offsets.<br>&nbsp;&nbsp;- Maximum of 14 Days backwards.`,
      ``,
      undefined,
      'Apply',
      (str: string) => /\d+/.test(str) && parseFloat(str) == parseInt(str) && parseInt(str) > topic.numOfPartitions,
      true
    )

    if (!prompt) {
      return
    }

    const body: KafkaSetConsumerOffsetByTimestampOptions = {
      topic: topic.topic,
      groupId: topic.groupId,
      timestamp: new Date(prompt.input).getTime()
    }

    try {
      SpinnerService.spin('mini')
      const response = await this._kafka.setConsumerGroupOffset(body)
      if (!response.success) {
        MethodsService.toast(
          'error',
          'Failed setting consumer offsets.',
          response.error || response.data
        )
      } else {
        MethodsService.toast(
          'success',
          `Consumer group offsets changed successfully!`
        )
        await this.refreshTable()
      }
    } catch (e) {
      LoggerService.error(e)
    }
    SpinnerService.stop('mini')
  }

  async deleteConsumerGroup (topic: T) {
    const prompt = await MethodsService.confirm(`Delete Consumer Group`,
      `You are about to delete <b class="text-nowrap">${topic.groupId}</b><br><br><b>Notes:</b><br>&nbsp;&nbsp;- Consumers of this group has to be shut before resetting the offsets.`,
      'Confirm',
    )

    if (!prompt) {
      return
    }

    const body: KafkaDeleteConsumerGroupOptions = {
      groupId: topic.groupId,
    }

    try {
      SpinnerService.spin('mini')
      const response = await this._kafka.deleteConsumerGroup(body)
      if (!response.success) {
        MethodsService.toast(
          'error',
          'Failed deleting consumer group',
          response.error || response.data
        )
      } else {
        MethodsService.toast(
          'success',
          `Consumer group deleted successfully!`
        )
        await this.refreshTable()
      }
    } catch (e) {
      LoggerService.error(e)
    }
    SpinnerService.stop('mini')
  }

  async deleteConsumerGroupData (topic: T) {
    const prompt = await MethodsService.confirm(`Delete Consumer Group DB Data`,
      `You are about to delete the MongoDB Data for <b class="text-nowrap">${topic.groupId}</b><br><br><b>Notes:</b><br>&nbsp;&nbsp;- this will not remove the actual Consumer Group from Kafka.`,
      'Confirm',
    )

    if (!prompt) {
      return
    }

    const body: KafkaDeleteConsumerGroupOptions = {
      groupId: topic.groupId,
    }

    try {
      SpinnerService.spin('mini')
      const response = await this._kafka.deleteConsumerGroupData(body)
      if (!response.success) {
        MethodsService.toast(
          'error',
          'Failed deleting consumer group',
          response.error || response.data
        )
      } else {
        MethodsService.toast(
          'success',
          `Consumer group MongoDB Data Deleted successfully!`
        )
        await this.refreshTable()
      }
    } catch (e) {
      LoggerService.error(e)
    }
    SpinnerService.stop('mini')
  }

  async refreshTable (background: boolean = false, ignoreSelected: boolean = false): Promise<void> {
    if (this._isRefreshing || (!ignoreSelected && this.selectedTopic)) {
      return
    }
    this._isRefreshing = true
    try {
      if (!background) {
        this.topics = []
        this.clearSelections()
        SpinnerService.spin('mini')
      }
      const [{ data, count, queryTimes, filterOptions }, metadata] = await Promise.all([
        this.getAllTopics(),
        this._kafka.getMetadata(),
      ])
      this.metadata = metadata
      this.queryTimes = queryTimes
      this.numOfAvailableDocs = count
      this.topics = data.map(topic => PiKafkaPanelProcessorService.processMetadata(topic, metadata)) as T[]

      this.allInstances = [...new Set(this.topics.map(t => t.clusterId))].filter(Boolean).sort()
        .map(i => ({ id: i, text: i }))

      this.allTopics = [...new Set(this.topics.map(t => t.pimTopic))].filter(Boolean).sort()
        .map(i => ({ id: i, text: i }))

      this.allConsumerGroups = [...new Set(this.topics.map(t => t.pimGroupId))].filter(Boolean).sort()
        .map(i => ({ id: i, text: i }))

      this.allRegions = [...new Set(this.topics.map(t => t.region))].filter(Boolean).sort()
        .map(i => ({ id: i, text: i }))

      const seenTotalMessagesTopic = new Set<string>()
      this.totalMessages = MethodsService.numberToSuffix(
        this.topics
          .filter(v => {
            if (seenTotalMessagesTopic.has(v.topic)) {
              return false
            }
            seenTotalMessagesTopic.add(v.topic)
            return true
          })
          .reduce((c, p) => c += p.totalMessages, 0)
      )
      this.totalConsumers = this.topics.reduce((c, p) => c += p.numOfConsumers, 0)
      this.totalPartitions = this.topics.reduce((c, p) => c += p.numOfPartitions, 0)
    } catch (e) {
      LoggerService.error(e)
    } finally {
      if (!background) {
        SpinnerService.stop('mini')
      }
      this.sortTable()
      this._isRefreshing = false
    }

  }

  private clearSelections () {
    this.selectedTopic = undefined
    this.totalMessages = undefined
    this.averageLag = undefined
    this.totalConsumers = undefined
    this.totalPartitions = undefined
    this.queryTimes = { db: 0, kafka: 0 }
  }

  private async getAllTopics (): Promise<KafkaTopicMetricsCountedResult> {
    const options: FilterArguments<KafkaTopicMetricsFilters> = {
      fromDate: this.currentTime.time(),
      // filter: this.filterValue,
      limit: Number.MAX_SAFE_INTEGER,
      page: this.currentPage - 1,
      // sortBy: this.currentSortBy,
      // sortDirection: this.isSortReversed ? 1 : -1,
      liveFeed: this.liveDataToggle,
      showLastPoint: this.showLastPoint,
      filterInactive: this.filterInactiveConsumerGroups
    }

    if (this.currentGranularity == 'auto') {
      if (this.timeFilter.indexOf(this.currentTime) >= 5) { // x>7 days
        options.granularity = 'day'
      } else if (this.timeFilter.indexOf(this.currentTime) >= 3) { // x = 3-7 days
        options.granularity = 'hour'
      } else {
        options.granularity = 'min'
      }
    } else {
      options.granularity = this.currentGranularity as CollectionIndexStatsFilters['granularity']
    }

    return await this._kafka.getAll(options)
  }

}
