"""Tests for blueberry.pbap.bluetooth_pbap."""

import os
import random
import time

from mobly import asserts
from mobly import test_runner
from mobly import signals
from mobly import utils

from mobly.controllers import android_device

from blueberry.utils import blueberry_base_test
from blueberry.utils import bt_constants
from blueberry.utils import bt_test_utils

# The path is used to place the created vcf files.
STORAGE_PATH = '/storage/emulated/0'

# URI for contacts database.
CONTACTS_URI = 'content://com.android.contacts/data/phones'

# Number of seconds to wait for contacts and call logs update.
WAITING_TIMEOUT_SEC = 60

# Number of contacts and call logs to be tested.
TEST_DATA_COUNT = 1000

# Permissions for Contacts app.
PERMISSION_LIST = [
    'android.permission.READ_CONTACTS',
    'android.permission.WRITE_CONTACTS',
]


class BluetoothPbapTest(blueberry_base_test.BlueberryBaseTest):
  """Test Class for Bluetooth PBAP Test."""

  def setup_class(self):
    """Standard Mobly setup class."""
    super(BluetoothPbapTest, self).setup_class()

    # Bluetooth carkit which role is Phone Book Client Equipment (PCE).
    self.derived_bt_device = self.derived_bt_devices[0]

    # Primary phone which role is Phone Book Server Equipment (PSE).
    self.pri_phone = self.android_devices[0]
    self.pri_phone.init_setup()
    self.pri_phone.sl4a_setup()
    self.derived_bt_device.add_sec_ad_device(self.pri_phone)

    # Grant the permissions to Contacts app.
    for device in [self.pri_phone, self.derived_bt_device]:
      required_permissions = PERMISSION_LIST
      # App requires READ_EXTERNAL_STORAGE to read contacts if SDK < 30.
      if int(device.build_info['build_version_sdk']) < 30:
        required_permissions.append('android.permission.READ_EXTERNAL_STORAGE')
      for permission in required_permissions:
        device.adb.shell('pm grant com.google.android.contacts %s' % permission)
    self.pse_mac_address = self.pri_phone.get_bluetooth_mac_address()
    mac_address = self.derived_bt_device.get_bluetooth_mac_address()
    self.derived_bt_device.activate_pairing_mode()
    self.pri_phone.pair_and_connect_bluetooth(mac_address)
    # Sleep until the connection stabilizes.
    time.sleep(5)

    # Allow permission access for PBAP profile.
    self.pri_phone.sl4a.bluetoothChangeProfileAccessPermission(
        mac_address,
        bt_constants.BluetoothProfile.PBAP.value,
        bt_constants.BluetoothAccessLevel.ACCESS_ALLOWED.value)

  def setup_test(self):
    super(BluetoothPbapTest, self).setup_test()
    # Make sure PBAP is not connected before running tests.
    self._terminate_pbap_connection()

  def _import_vcf_to_pse(self, file_name, expected_contact_count):
    """Imports the vcf file to PSE."""
    # Open ImportVcardActivity and click "OK" in the pop-up dialog, then
    # PickActivity will be launched and browses the existing vcf files.
    self.pri_phone.adb.shell(
        'am start com.google.android.contacts/'
        'com.google.android.apps.contacts.vcard.ImportVCardActivity')
    self.pri_phone.aud(text='OK').click()

    # Check if the vcf file appears in the PickActivity.
    if not self.pri_phone.aud(text=file_name).exists():
      raise android_device.DeviceError(
          self.pri_phone,
          'No file name matches "%s" in PickActivity.' % file_name)

    # TODO(user): Remove the check of code name for S build.
    if (self.pri_phone.build_info['build_version_codename'] != 'S' and
        int(self.pri_phone.build_info['build_version_sdk']) <= 30):
      # Since `adb shell input tap` cannot work in PickActivity before R build,
      # send TAB and ENETER Key events to select and import the vcf file.
      if self.pri_phone.aud(content_desc='Grid view').exists():
        # Switch Grid mode since ENTER Key event cannot work in List mode on
        # git_rvc-d2-release branch.
        self.pri_phone.aud(content_desc='Grid view').click()
      self.pri_phone.aud.send_key_code('KEYCODE_TAB')
      self.pri_phone.aud.send_key_code('KEYCODE_ENTER')
    else:
      self.pri_phone.aud(text=file_name).click()
    self.pri_phone.log.info('Importing "%s"...' % file_name)
    current_count = self._wait_and_get_contact_count(
        self.pri_phone, expected_contact_count, WAITING_TIMEOUT_SEC)
    if current_count != expected_contact_count:
      raise android_device.DeviceError(
          self.pri_phone,
          'Failed to import %d contact(s) within %ds. Actual count: %d' %
          (expected_contact_count, WAITING_TIMEOUT_SEC, current_count))
    self.pri_phone.log.info(
        'Successfully added %d contact(s).' % current_count)

  def _generate_contacts_on_pse(self,
                                num_of_contacts,
                                first_name=None,
                                last_name=None,
                                phone_number=None):
    """Generates contacts to be tested on PSE."""
    vcf_file = bt_test_utils.create_vcf_from_vcard(
        output_path=self.pri_phone.log_path,
        num_of_contacts=num_of_contacts,
        first_name=first_name,
        last_name=last_name,
        phone_number=phone_number)
    self.pri_phone.adb.push([vcf_file, STORAGE_PATH])
    # For R build, since the pushed vcf file probably not found when importing
    # contacts, do a media scan to recognize the file.
    if int(self.pri_phone.build_info['build_version_sdk']) > 29:
      self.pri_phone.adb.shell('content call --uri content://media/ --method '
                               'scan_volume --arg external_primary')
    file_name = vcf_file.split('/')[-1]
    self._import_vcf_to_pse(file_name, num_of_contacts)
    self.pri_phone.adb.shell(
        'rm -rf %s' % os.path.join(STORAGE_PATH, file_name))

  def _generate_call_logs_on_pse(self, call_log_type, num_of_call_logs):
    """Generates call logs to be tested on PSE."""
    self.pri_phone.log.info('Putting %d call log(s) which type are "%s"...' %
                            (num_of_call_logs, call_log_type))
    for _ in range(num_of_call_logs):
      self.pri_phone.sl4a.callLogsPut(dict(
          type=call_log_type,
          number='8809%d' % random.randrange(int(10e8)),
          time=int(1000 * float(self.pri_phone.adb.shell('date +%s.%N')))))
    current_count = self._wait_and_get_call_log_count(
        self.pri_phone,
        call_log_type,
        num_of_call_logs,
        WAITING_TIMEOUT_SEC)
    if current_count != num_of_call_logs:
      raise android_device.DeviceError(
          self.pri_phone,
          'Failed to generate %d call log(s) within %ds. '
          'Actual count: %d, Call log type: %s' %
          (num_of_call_logs, WAITING_TIMEOUT_SEC, current_count, call_log_type))
    self.pri_phone.log.info(
        'Successfully added %d call log(s).' % current_count)

  def _wait_and_get_contact_count(self,
                                  device,
                                  expected_contact_count,
                                  timeout_sec):
    """Waits for contact update for a period time and returns contact count.

    This method should be used when a device imports some new contacts. It can
    wait some time for contact update until expectation or timeout and then
    return contact count.

    Args:
      device: AndroidDevice, Mobly Android controller class.
      expected_contact_count: Int, Number of contacts as expected.
      timeout_sec: Int, Number of seconds to wait for contact update.

    Returns:
      current_count: Int, number of the existing contacts on the device.
    """
    start_time = time.time()
    end_time = start_time + timeout_sec
    current_count = 0
    while time.time() < end_time:
      current_count = device.sl4a.contactsGetCount()
      if current_count == expected_contact_count:
        break
      # Interval between attempts to get contacts.
      time.sleep(1)
    if current_count != expected_contact_count:
      device.log.warning(
          'Failed to get expected contact count: %d. '
          'Actual contact count: %d.' %
          (expected_contact_count, current_count))
    return current_count

  def _wait_and_get_call_log_count(self,
                                   device,
                                   call_log_type,
                                   expected_call_log_count,
                                   timeout_sec):
    """Waits for call log update for a period time and returns call log count.

    This method should be used when a device adds some new call logs. It can
    wait some time for call log update until expectation or timeout and then
    return call log count.

    Args:
      device: AndroidDevice, Mobly Android controller class.
      call_log_type: String, Type of the call logs.
      expected_call_log_count: Int, Number of call logs as expected.
      timeout_sec: Int, Number of seconds to wait for call log update.

    Returns:
      current_count: Int, number of the existing call logs on the device.
    """
    start_time = time.time()
    end_time = start_time + timeout_sec
    current_count = 0
    while time.time() < end_time:
      current_count = len(device.sl4a.callLogsGet(call_log_type))
      if current_count == expected_call_log_count:
        break
      # Interval between attempts to get call logs.
      time.sleep(1)
    if current_count != expected_call_log_count:
      device.log.warning(
          'Failed to get expected call log count: %d. '
          'Actual call log count: %d.' %
          (expected_call_log_count, current_count))
    return current_count

  def _terminate_pbap_connection(self):
    status = self.derived_bt_device.sl4a.bluetoothPbapClientGetConnectionStatus(
        self.pse_mac_address)
    if status == bt_constants.BluetoothConnectionStatus.STATE_DISCONNECTED:
      return
    self.derived_bt_device.log.info('Disconnecting PBAP...')
    self.derived_bt_device.sl4a.bluetoothPbapClientDisconnect(
        self.pse_mac_address)
    # Buffer for the connection status check.
    time.sleep(3)
    status = self.derived_bt_device.sl4a.bluetoothPbapClientGetConnectionStatus(
        self.pse_mac_address)
    if status != bt_constants.BluetoothConnectionStatus.STATE_DISCONNECTED:
      raise signals.TestError('PBAP connection failed to be terminated.')
    self.derived_bt_device.log.info('Successfully disconnected PBAP.')

  def test_download_contacts(self):
    """Test for the feature of downloading contacts.

    Tests that PCE can download contacts from PSE.
    """
    # Make sure no any contacts exist on the devices.
    for device in [self.pri_phone, self.derived_bt_device]:
      device.sl4a.contactsEraseAll()

    # Add contacts to PSE.
    self._generate_contacts_on_pse(TEST_DATA_COUNT)

    # When PCE is connected to PSE, it will download PSE's contacts.
    self.derived_bt_device.pbap_connect()
    self.derived_bt_device.log.info('Downloading contacts from PSE...')
    current_count = self._wait_and_get_contact_count(
        self.derived_bt_device, TEST_DATA_COUNT, WAITING_TIMEOUT_SEC)
    self.derived_bt_device.log.info(
        'Successfully downloaded %d contact(s).' % current_count)

    asserts.assert_true(
        current_count == TEST_DATA_COUNT,
        'PCE failed to download %d contact(s) within %ds, '
        'actually downloaded %d contact(s).' %
        (TEST_DATA_COUNT, WAITING_TIMEOUT_SEC, current_count))

  def test_download_call_logs(self):
    """Test for the feature of downloading call logs.

    Tests that PCE can download incoming/outgoing/missed call logs from PSE.
    """
    # Make sure no any call logs exist on the devices.
    for device in [self.pri_phone, self.derived_bt_device]:
      device.sl4a.callLogsEraseAll()

    call_log_types = [
        bt_constants.INCOMING_CALL_LOG_TYPE,
        bt_constants.OUTGOING_CALL_LOG_TYPE,
        bt_constants.MISSED_CALL_LOG_TYPE,
    ]
    for call_log_type in call_log_types:
      # Add call logs to PSE.
      self._generate_call_logs_on_pse(call_log_type, TEST_DATA_COUNT)

    # When PCE is connected to PSE, it will download PSE's contacts.
    self.derived_bt_device.pbap_connect()
    self.derived_bt_device.log.info('Downloading call logs...')

    for call_log_type in call_log_types:
      current_count = self._wait_and_get_call_log_count(
          self.derived_bt_device,
          call_log_type,
          TEST_DATA_COUNT,
          WAITING_TIMEOUT_SEC)
      self.derived_bt_device.log.info(
          'Successfully downloaded %d call log(s) which type are "%s".' %
          (current_count, call_log_type))

      asserts.assert_true(
          current_count == TEST_DATA_COUNT,
          'PCE failed to download %d call log(s) which type are "%s" within %ds'
          ', actually downloaded %d call log(s).' %
          (TEST_DATA_COUNT, call_log_type, WAITING_TIMEOUT_SEC, current_count))

  def test_show_caller_name(self):
    """Test for caller name of the incoming phone call is correct on PCE.

    Tests that caller name matches contact name which is downloaded via PBAP.
    """
    # Checks if two android devices exist.
    if len(self.android_devices) < 2:
      raise signals.TestError('This test requires two Android devices.')
    primary_phone = self.pri_phone
    secondary_phone = self.android_devices[1]
    secondary_phone.init_setup()
    for phone in [primary_phone, secondary_phone]:
      # Checks if SIM state is loaded for every devices.
      if not phone.is_sim_state_loaded():
        raise signals.TestError(f'Please insert a SIM Card to the phone '
                                f'"{phone.serial}".')
      # Checks if phone_number is provided in the support dimensions.
      phone.phone_number = phone.dimensions.get('phone_number')
      if not phone.phone_number:
        raise signals.TestError(f'Please add "phone_number" to support '
                                f'dimensions of the phone "{phone.serial}".')
    # Make sure no any contacts exist on the devices.
    for device in [primary_phone, self.derived_bt_device]:
      device.sl4a.contactsEraseAll()
    # Generate a contact name randomly.
    first_name = utils.rand_ascii_str(4)
    last_name = utils.rand_ascii_str(4)
    full_name = f'{first_name} {last_name}'
    primary_phone.log.info('Creating a contact "%s"...', full_name)
    self._generate_contacts_on_pse(
        num_of_contacts=1,
        first_name=first_name,
        last_name=last_name,
        phone_number=secondary_phone.phone_number)
    self.derived_bt_device.log.info('Connecting to PSE...')
    self.derived_bt_device.pbap_connect()
    self.derived_bt_device.log.info('Downloading contacts from PSE...')
    current_count = self._wait_and_get_contact_count(
        device=self.derived_bt_device,
        expected_contact_count=1,
        timeout_sec=WAITING_TIMEOUT_SEC)
    self.derived_bt_device.log.info('Successfully downloaded %d contact(s).',
                                    current_count)
    asserts.assert_equal(
        first=current_count,
        second=1,
        msg=f'Failed to download the contact "{full_name}".')
    secondary_phone.sl4a.telecomCallNumber(primary_phone.phone_number)
    secondary_phone.log.info('Made a phone call to device "%s".',
                             primary_phone.serial)
    primary_phone.log.info('Waiting for the incoming call from device "%s"...',
                           secondary_phone.serial)
    is_ringing = primary_phone.wait_for_call_state(
        bt_constants.CALL_STATE_RINGING,
        bt_constants.CALL_STATE_TIMEOUT_SEC)
    if not is_ringing:
      raise signals.TestError(
          f'Timed out after {bt_constants.CALL_STATE_TIMEOUT_SEC}s waiting for '
          f'the incoming call from device "{secondary_phone.serial}".')
    try:
      self.derived_bt_device.aud.open_notification()
      target_node_match_dict = {
          'resource_id': 'android:id/line1',
          'child': {
              'resource_id': 'android:id/title'
          }
      }
      hfp_address = primary_phone.get_bluetooth_mac_address()
      target_node = self.derived_bt_device.aud(
          sibling=target_node_match_dict,
          text=f'Incoming call via HFP {hfp_address}')
      caller_name = target_node.get_attribute_value('text')
      message = (f'Caller name is incorrect. Actual: {caller_name}, '
                 f'Correct: {full_name}')
      # Asserts that caller name of the incoming phone call is correct in the
      # notification bar.
      asserts.assert_equal(
          first=caller_name,
          second=full_name,
          msg=message)
    finally:
      # Recovery actions.
      self.derived_bt_device.aud.close_notification()
      secondary_phone.sl4a.telecomEndCall()


if __name__ == '__main__':
  test_runner.main()
