| #!/usr/bin/env python3 |
| # |
| # Copyright 2018 Google, Inc. |
| # |
| # Licensed under the Apache License, Version 2.0 (the "License"); |
| # you may not use this file except in compliance with the License. |
| # You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| |
| import logging |
| import os |
| import time |
| |
| from queue import Empty |
| |
| from acts import asserts |
| from acts import utils |
| from acts_contrib.test_utils.wifi.p2p import wifi_p2p_const as p2pconsts |
| import acts.utils |
| |
| |
| def is_discovered(event, ad): |
| """Check an Android device exist in WifiP2pOnPeersAvailable event or not. |
| |
| Args: |
| event: WifiP2pOnPeersAvailable which include all of p2p devices. |
| ad: The android device |
| Returns: |
| True: if an Android device exist in p2p list |
| False: if not exist |
| """ |
| for device in event['data']['Peers']: |
| if device['Name'] == ad.name: |
| ad.deviceAddress = device['Address'] |
| return True |
| return False |
| |
| |
| def check_disconnect(ad, timeout=p2pconsts.DEFAULT_TIMEOUT): |
| """Check an Android device disconnect or not |
| |
| Args: |
| ad: The android device |
| """ |
| ad.droid.wifiP2pRequestConnectionInfo() |
| # wait disconnect event |
| ad.ed.pop_event(p2pconsts.DISCONNECTED_EVENT, timeout) |
| |
| |
| def p2p_disconnect(ad): |
| """Invoke an Android device removeGroup to trigger p2p disconnect |
| |
| Args: |
| ad: The android device |
| """ |
| ad.log.debug("Disconnect") |
| ad.droid.wifiP2pRemoveGroup() |
| check_disconnect(ad) |
| |
| |
| def p2p_connection_ping_test(ad, target_ip_address): |
| """Let an Android device to start ping target_ip_address |
| |
| Args: |
| ad: The android device |
| target_ip_address: ip address which would like to ping |
| """ |
| ad.log.debug("Run Ping Test, %s ping %s " % (ad.serial, target_ip_address)) |
| asserts.assert_true( |
| acts.utils.adb_shell_ping(ad, |
| count=6, |
| dest_ip=target_ip_address, |
| timeout=20), "%s ping failed" % (ad.serial)) |
| |
| |
| def is_go(ad): |
| """Check an Android p2p role is Go or not |
| |
| Args: |
| ad: The android device |
| Return: |
| True: An Android device is p2p go |
| False: An Android device is p2p gc |
| """ |
| ad.log.debug("is go check") |
| ad.droid.wifiP2pRequestConnectionInfo() |
| ad_connect_info_event = ad.ed.pop_event( |
| p2pconsts.CONNECTION_INFO_AVAILABLE_EVENT, p2pconsts.DEFAULT_TIMEOUT) |
| if ad_connect_info_event['data']['isGroupOwner']: |
| return True |
| return False |
| |
| |
| def p2p_go_ip(ad): |
| """Get GO IP address |
| |
| Args: |
| ad: The android device |
| Return: |
| GO IP address |
| """ |
| ad.log.debug("p2p go ip") |
| ad.droid.wifiP2pRequestConnectionInfo() |
| ad_connect_info_event = ad.ed.pop_event( |
| p2pconsts.CONNECTION_INFO_AVAILABLE_EVENT, p2pconsts.DEFAULT_TIMEOUT) |
| ad.log.debug("p2p go ip: %s" % |
| ad_connect_info_event['data']['groupOwnerHostAddress']) |
| return ad_connect_info_event['data']['groupOwnerHostAddress'] |
| |
| |
| def p2p_get_current_group(ad): |
| """Get current group information |
| |
| Args: |
| ad: The android device |
| Return: |
| p2p group information |
| """ |
| ad.log.debug("get current group") |
| ad.droid.wifiP2pRequestGroupInfo() |
| ad_group_info_event = ad.ed.pop_event(p2pconsts.GROUP_INFO_AVAILABLE_EVENT, |
| p2pconsts.DEFAULT_TIMEOUT) |
| ad.log.debug( |
| "p2p group: SSID:%s, password:%s, owner address: %s, interface: %s" % |
| (ad_group_info_event['data']['NetworkName'], |
| ad_group_info_event['data']['Passphrase'], |
| ad_group_info_event['data']['OwnerAddress'], |
| ad_group_info_event['data']['Interface'])) |
| return ad_group_info_event['data'] |
| |
| def is_ongoing_peer_ready(peerConfig, waitForPin): |
| """Check whether the peer config is ready |
| |
| Args: |
| peerConfig: the ongoing config |
| waitForPin: this config needs key or not |
| Return: |
| true for ready; false otherwise. |
| """ |
| if peerConfig is None: |
| return False |
| if not peerConfig['data'][WifiP2PEnums.WifiP2pConfig.DEVICEADDRESS_KEY]: |
| return False |
| if not waitForPin: |
| return True |
| if WifiP2PEnums.WpsInfo.WPS_PIN_KEY in peerConfig['data'][ |
| WifiP2PEnums.WifiP2pConfig.WPSINFO_KEY]: |
| return True |
| return False |
| |
| def wait_for_ongoing_peer_ready(ad, waitForPin, maxPollingCount): |
| """wait for the ongoing peer data ready |
| |
| Args: |
| ad: The android device |
| waitForPin: this config needs key or not |
| maxPollingCount: the max polling count |
| Return: |
| the ongoing peer config |
| """ |
| ad_peerConfig = None |
| ad.log.debug("%s is waiting for the ongoing peer, max polling count %s" |
| % (ad.name, maxPollingCount)) |
| while maxPollingCount > 0: |
| ad.droid.requestP2pPeerConfigure() |
| ad_peerConfig = ad.ed.pop_event( |
| p2pconsts.ONGOING_PEER_INFO_AVAILABLE_EVENT, |
| p2pconsts.DEFAULT_TIMEOUT) |
| maxPollingCount -= 1 |
| if is_ongoing_peer_ready(ad_peerConfig, waitForPin): |
| break |
| ad.log.debug("%s is not ready for next step" % (ad.name)) |
| time.sleep(p2pconsts.DEFAULT_POLLING_SLEEPTIME) |
| asserts.assert_true( |
| ad_peerConfig['data'][WifiP2PEnums.WifiP2pConfig.DEVICEADDRESS_KEY], |
| "DUT %s does not receive the request." % (ad.name)) |
| ad.log.debug(ad_peerConfig['data']) |
| return ad_peerConfig |
| |
| #trigger p2p connect to ad2 from ad1 |
| def p2p_connect(ad1, |
| ad2, |
| isReconnect, |
| wpsSetup, |
| p2p_connect_type=p2pconsts.P2P_CONNECT_NEGOTIATION, |
| go_ad=None): |
| """trigger p2p connect to ad2 from ad1 |
| |
| Args: |
| ad1: The android device |
| ad2: The android device |
| isReconnect: boolean, if persist group is exist, |
| isReconnect is true, otherswise is false. |
| wpsSetup: which wps connection would like to use |
| p2p_connect_type: enumeration, which type this p2p connection is |
| go_ad: The group owner android device which is used for the invitation connection |
| """ |
| ad1.log.info("Create p2p connection from %s to %s via wps: %s type %d" % |
| (ad1.name, ad2.name, wpsSetup, p2p_connect_type)) |
| if p2p_connect_type == p2pconsts.P2P_CONNECT_INVITATION: |
| if go_ad is None: |
| go_ad = ad1 |
| find_p2p_device(ad1, ad2) |
| # GO might be another peer, so ad2 needs to find it first. |
| find_p2p_group_owner(ad2, go_ad) |
| elif p2p_connect_type == p2pconsts.P2P_CONNECT_JOIN: |
| find_p2p_group_owner(ad1, ad2) |
| else: |
| find_p2p_device(ad1, ad2) |
| time.sleep(p2pconsts.DEFAULT_SLEEPTIME) |
| wifi_p2p_config = { |
| WifiP2PEnums.WifiP2pConfig.DEVICEADDRESS_KEY: ad2.deviceAddress, |
| WifiP2PEnums.WifiP2pConfig.WPSINFO_KEY: { |
| WifiP2PEnums.WpsInfo.WPS_SETUP_KEY: wpsSetup |
| } |
| } |
| ad1.droid.wifiP2pConnect(wifi_p2p_config) |
| ad1.ed.pop_event(p2pconsts.CONNECT_SUCCESS_EVENT, |
| p2pconsts.DEFAULT_TIMEOUT) |
| if not isReconnect: |
| # ad1 is the initiator, it should be ready soon. |
| ad1_peerConfig = wait_for_ongoing_peer_ready(ad1, |
| wpsSetup == WifiP2PEnums.WpsInfo.WIFI_WPS_INFO_DISPLAY, 6) |
| # auto-join tries 10 times to find groups, and |
| # one round takes 2 - 3 seconds. |
| ad2_peerConfig = wait_for_ongoing_peer_ready(ad2, |
| wpsSetup == WifiP2PEnums.WpsInfo.WIFI_WPS_INFO_KEYPAD, 31) |
| if wpsSetup == WifiP2PEnums.WpsInfo.WIFI_WPS_INFO_DISPLAY: |
| asserts.assert_true( |
| WifiP2PEnums.WpsInfo.WPS_PIN_KEY in ad1_peerConfig['data'][ |
| WifiP2PEnums.WifiP2pConfig.WPSINFO_KEY], |
| "Can't get display pin value") |
| ad2_peerConfig['data'][WifiP2PEnums.WifiP2pConfig.WPSINFO_KEY][ |
| WifiP2PEnums.WpsInfo.WPS_PIN_KEY] = ad1_peerConfig['data'][ |
| WifiP2PEnums.WifiP2pConfig.WPSINFO_KEY][ |
| WifiP2PEnums.WpsInfo.WPS_PIN_KEY] |
| ad2.droid.setP2pPeerConfigure(ad2_peerConfig['data']) |
| ad2.ed.pop_event(p2pconsts.ONGOING_PEER_SET_SUCCESS_EVENT, |
| p2pconsts.DEFAULT_TIMEOUT) |
| ad2.droid.wifiP2pAcceptConnection() |
| elif wpsSetup == WifiP2PEnums.WpsInfo.WIFI_WPS_INFO_KEYPAD: |
| asserts.assert_true( |
| WifiP2PEnums.WpsInfo.WPS_PIN_KEY in ad2_peerConfig['data'][ |
| WifiP2PEnums.WifiP2pConfig.WPSINFO_KEY], |
| "Can't get keypad pin value") |
| ad1_peerConfig['data'][WifiP2PEnums.WifiP2pConfig.WPSINFO_KEY][ |
| WifiP2PEnums.WpsInfo.WPS_PIN_KEY] = ad2_peerConfig['data'][ |
| WifiP2PEnums.WifiP2pConfig.WPSINFO_KEY][ |
| WifiP2PEnums.WpsInfo.WPS_PIN_KEY] |
| ad1.droid.setP2pPeerConfigure(ad1_peerConfig['data']) |
| ad1.ed.pop_event(p2pconsts.ONGOING_PEER_SET_SUCCESS_EVENT, |
| p2pconsts.DEFAULT_TIMEOUT) |
| ad1.droid.wifiP2pAcceptConnection() |
| time.sleep(p2pconsts.DEFAULT_SLEEPTIME) |
| ad2.droid.wifiP2pConfirmConnection() |
| elif wpsSetup == WifiP2PEnums.WpsInfo.WIFI_WPS_INFO_PBC: |
| ad2.droid.wifiP2pAcceptConnection() |
| if p2p_connect_type == p2pconsts.P2P_CONNECT_INVITATION: |
| time.sleep(p2pconsts.DEFAULT_SLEEPTIME) |
| go_ad.droid.wifiP2pAcceptConnection() |
| |
| #wait connected event |
| if p2p_connect_type == p2pconsts.P2P_CONNECT_INVITATION: |
| go_ad.ed.pop_event(p2pconsts.CONNECTED_EVENT, |
| p2pconsts.DEFAULT_TIMEOUT) |
| else: |
| ad1.ed.pop_event(p2pconsts.CONNECTED_EVENT, p2pconsts.DEFAULT_TIMEOUT) |
| ad2.ed.pop_event(p2pconsts.CONNECTED_EVENT, p2pconsts.DEFAULT_TIMEOUT) |
| |
| |
| def p2p_connect_with_config(ad1, ad2, network_name, passphrase, band): |
| """trigger p2p connect to ad2 from ad1 with config |
| |
| Args: |
| ad1: The android device |
| ad2: The android device |
| network_name: the network name of the desired group. |
| passphrase: the passphrase of the desired group. |
| band: the operating band of the desired group. |
| """ |
| ad1.log.info("Create p2p connection from %s to %s" % (ad1.name, ad2.name)) |
| find_p2p_device(ad1, ad2) |
| time.sleep(p2pconsts.DEFAULT_SLEEPTIME) |
| wifi_p2p_config = { |
| WifiP2PEnums.WifiP2pConfig.NETWORK_NAME: network_name, |
| WifiP2PEnums.WifiP2pConfig.PASSPHRASE: passphrase, |
| WifiP2PEnums.WifiP2pConfig.GROUP_BAND: band, |
| WifiP2PEnums.WifiP2pConfig.WPSINFO_KEY: { |
| WifiP2PEnums.WpsInfo.WPS_SETUP_KEY: |
| WifiP2PEnums.WpsInfo.WIFI_WPS_INFO_PBC |
| } |
| } |
| ad1.droid.wifiP2pConnect(wifi_p2p_config) |
| ad1.ed.pop_event(p2pconsts.CONNECT_SUCCESS_EVENT, |
| p2pconsts.DEFAULT_TIMEOUT) |
| time.sleep(p2pconsts.DEFAULT_SLEEPTIME) |
| |
| #wait connected event |
| ad1.ed.pop_event(p2pconsts.CONNECTED_EVENT, p2pconsts.DEFAULT_TIMEOUT) |
| ad2.ed.pop_event(p2pconsts.CONNECTED_EVENT, p2pconsts.DEFAULT_TIMEOUT) |
| |
| |
| def find_p2p_device(ad1, ad2): |
| """Check an Android device ad1 can discover an Android device ad2 |
| |
| Args: |
| ad1: The android device |
| ad2: The android device |
| """ |
| ad1.droid.wifiP2pDiscoverPeers() |
| ad2.droid.wifiP2pDiscoverPeers() |
| p2p_find_result = False |
| ad1.ed.clear_events(p2pconsts.PEER_AVAILABLE_EVENT) |
| while not p2p_find_result: |
| ad1_event = ad1.ed.pop_event(p2pconsts.PEER_AVAILABLE_EVENT, |
| p2pconsts.P2P_FIND_TIMEOUT) |
| ad1.log.debug(ad1_event['data']) |
| p2p_find_result = is_discovered(ad1_event, ad2) |
| asserts.assert_true(p2p_find_result, |
| "DUT didn't discovered peer:%s device" % (ad2.name)) |
| |
| |
| def find_p2p_group_owner(ad1, ad2): |
| """Check an Android device ad1 can discover an Android device ad2 which |
| is a group owner |
| |
| Args: |
| ad1: The android device |
| ad2: The android device which is a group owner |
| """ |
| p2p_find_result = False |
| ad1.ed.clear_events(p2pconsts.PEER_AVAILABLE_EVENT) |
| while not p2p_find_result: |
| ad2.droid.wifiP2pStopPeerDiscovery() |
| ad1.droid.wifiP2pStopPeerDiscovery() |
| ad2.droid.wifiP2pDiscoverPeers() |
| ad1.droid.wifiP2pDiscoverPeers() |
| ad1_event = ad1.ed.pop_event(p2pconsts.PEER_AVAILABLE_EVENT, |
| p2pconsts.P2P_FIND_TIMEOUT) |
| ad1.log.debug(ad1_event['data']) |
| for device in ad1_event['data']['Peers']: |
| if (device['Name'] == ad2.name and int(device['GroupCapability']) |
| & p2pconsts.P2P_GROUP_CAPAB_GROUP_OWNER): |
| ad2.deviceAddress = device['Address'] |
| p2p_find_result = True |
| asserts.assert_true( |
| p2p_find_result, |
| "DUT didn't discovered group owner peer:%s device" % (ad2.name)) |
| |
| |
| def createP2pLocalService(ad, serviceCategory): |
| """Based on serviceCategory to create p2p local service |
| on an Android device ad |
| |
| Args: |
| ad: The android device |
| serviceCategory: p2p local service type, UPNP / IPP / AFP, |
| """ |
| testData = genTestData(serviceCategory) |
| if serviceCategory == p2pconsts.P2P_LOCAL_SERVICE_UPNP: |
| ad.droid.wifiP2pCreateUpnpServiceInfo(testData[0], testData[1], |
| testData[2]) |
| elif (serviceCategory == p2pconsts.P2P_LOCAL_SERVICE_IPP |
| or serviceCategory == p2pconsts.P2P_LOCAL_SERVICE_AFP): |
| ad.droid.wifiP2pCreateBonjourServiceInfo(testData[0], testData[1], |
| testData[2]) |
| ad.droid.wifiP2pAddLocalService() |
| |
| |
| def requestServiceAndCheckResult(ad_serviceProvider, ad_serviceReceiver, |
| serviceType, queryString1, queryString2): |
| """Based on serviceType and query info, check service request result |
| same as expect or not on an Android device ad_serviceReceiver. |
| And remove p2p service request after result check. |
| |
| Args: |
| ad_serviceProvider: The android device which provide p2p local service |
| ad_serviceReceiver: The android device which query p2p local service |
| serviceType: P2p local service type, Upnp or Bonjour |
| queryString1: Query String, NonNull |
| queryString2: Query String, used for Bonjour, Nullable |
| """ |
| expectData = genExpectTestData(serviceType, queryString1, queryString2) |
| find_p2p_device(ad_serviceReceiver, ad_serviceProvider) |
| ad_serviceReceiver.droid.wifiP2pStopPeerDiscovery() |
| ad_serviceReceiver.droid.wifiP2pClearServiceRequests() |
| time.sleep(p2pconsts.DEFAULT_FUNCTION_SWITCH_TIME) |
| |
| ad_serviceReceiver.droid.wifiP2pDiscoverServices() |
| serviceData = {} |
| service_id = 0 |
| if (serviceType == |
| WifiP2PEnums.WifiP2pServiceInfo.WIFI_P2P_SERVICE_TYPE_BONJOUR): |
| ad_serviceReceiver.log.info( |
| "Request bonjour service in \ |
| %s with Query String %s and %s " % |
| (ad_serviceReceiver.name, queryString1, queryString2)) |
| ad_serviceReceiver.log.info("expectData %s" % expectData) |
| if queryString1 != None: |
| service_id = ad_serviceReceiver.droid.wifiP2pAddDnssdServiceRequest( |
| queryString1, queryString2) |
| else: |
| service_id = ad_serviceReceiver.droid.wifiP2pAddServiceRequest( |
| serviceType) |
| ad_serviceReceiver.log.info("request bonjour service id %s" % |
| service_id) |
| ad_serviceReceiver.droid.wifiP2pSetDnsSdResponseListeners() |
| ad_serviceReceiver.droid.wifiP2pDiscoverServices() |
| ad_serviceReceiver.log.info("Check Service Listener") |
| time.sleep(p2pconsts.DEFAULT_SERVICE_WAITING_TIME) |
| try: |
| dnssd_events = ad_serviceReceiver.ed.pop_all(p2pconsts.DNSSD_EVENT) |
| dnssd_txrecord_events = ad_serviceReceiver.ed.pop_all( |
| p2pconsts.DNSSD_TXRECORD_EVENT) |
| dns_service = WifiP2PEnums.WifiP2pDnsSdServiceResponse() |
| for dnssd_event in dnssd_events: |
| if dnssd_event['data'][ |
| 'SourceDeviceAddress'] == ad_serviceProvider.deviceAddress: |
| dns_service.InstanceName = dnssd_event['data'][ |
| p2pconsts.DNSSD_EVENT_INSTANCENAME_KEY] |
| dns_service.RegistrationType = dnssd_event['data'][ |
| p2pconsts.DNSSD_EVENT_REGISTRATIONTYPE_KEY] |
| dns_service.FullDomainName = "" |
| dns_service.TxtRecordMap = "" |
| serviceData[dns_service.toString()] = 1 |
| for dnssd_txrecord_event in dnssd_txrecord_events: |
| if dnssd_txrecord_event['data'][ |
| 'SourceDeviceAddress'] == ad_serviceProvider.deviceAddress: |
| dns_service.InstanceName = "" |
| dns_service.RegistrationType = "" |
| dns_service.FullDomainName = dnssd_txrecord_event['data'][ |
| p2pconsts.DNSSD_TXRECORD_EVENT_FULLDOMAINNAME_KEY] |
| dns_service.TxtRecordMap = dnssd_txrecord_event['data'][ |
| p2pconsts.DNSSD_TXRECORD_EVENT_TXRECORDMAP_KEY] |
| serviceData[dns_service.toString()] = 1 |
| ad_serviceReceiver.log.info("serviceData %s" % serviceData) |
| if len(serviceData) == 0: |
| ad_serviceReceiver.droid.wifiP2pRemoveServiceRequest( |
| service_id) |
| return -1 |
| except queue.Empty as error: |
| ad_serviceReceiver.log.info("dnssd event is empty", ) |
| elif (serviceType == |
| WifiP2PEnums.WifiP2pServiceInfo.WIFI_P2P_SERVICE_TYPE_UPNP): |
| ad_serviceReceiver.log.info( |
| "Request upnp service in %s with Query String %s " % |
| (ad_serviceReceiver.name, queryString1)) |
| ad_serviceReceiver.log.info("expectData %s" % expectData) |
| if queryString1 != None: |
| service_id = ad_serviceReceiver.droid.wifiP2pAddUpnpServiceRequest( |
| queryString1) |
| else: |
| service_id = ad_serviceReceiver.droid.wifiP2pAddServiceRequest( |
| WifiP2PEnums.WifiP2pServiceInfo.WIFI_P2P_SERVICE_TYPE_UPNP) |
| ad_serviceReceiver.droid.wifiP2pSetUpnpResponseListeners() |
| ad_serviceReceiver.droid.wifiP2pDiscoverServices() |
| ad_serviceReceiver.log.info("Check Service Listener") |
| time.sleep(p2pconsts.DEFAULT_SERVICE_WAITING_TIME) |
| try: |
| upnp_events = ad_serviceReceiver.ed.pop_all(p2pconsts.UPNP_EVENT) |
| for upnp_event in upnp_events: |
| if upnp_event['data']['Device'][ |
| 'Address'] == ad_serviceProvider.deviceAddress: |
| for service in upnp_event['data'][ |
| p2pconsts.UPNP_EVENT_SERVICELIST_KEY]: |
| serviceData[service] = 1 |
| ad_serviceReceiver.log.info("serviceData %s" % serviceData) |
| if len(serviceData) == 0: |
| ad_serviceReceiver.droid.wifiP2pRemoveServiceRequest( |
| service_id) |
| return -1 |
| except queue.Empty as error: |
| ad_serviceReceiver.log.info("p2p upnp event is empty", ) |
| |
| ad_serviceReceiver.log.info("Check ServiceList") |
| asserts.assert_true(checkServiceQueryResult(serviceData, expectData), |
| "ServiceList not same as Expect") |
| # After service checked, remove the service_id |
| ad_serviceReceiver.droid.wifiP2pRemoveServiceRequest(service_id) |
| return 0 |
| |
| |
| def requestServiceAndCheckResultWithRetry(ad_serviceProvider, |
| ad_serviceReceiver, |
| serviceType, |
| queryString1, |
| queryString2, |
| retryCount=3): |
| """ allow failures for requestServiceAndCheckResult. Service |
| discovery might fail unexpectedly because the request packet might not be |
| recevied by the service responder due to p2p state switch. |
| |
| Args: |
| ad_serviceProvider: The android device which provide p2p local service |
| ad_serviceReceiver: The android device which query p2p local service |
| serviceType: P2p local service type, Upnp or Bonjour |
| queryString1: Query String, NonNull |
| queryString2: Query String, used for Bonjour, Nullable |
| retryCount: maximum retry count, default is 3 |
| """ |
| ret = 0 |
| while retryCount > 0: |
| ret = requestServiceAndCheckResult(ad_serviceProvider, |
| ad_serviceReceiver, serviceType, |
| queryString1, queryString2) |
| if (ret == 0): |
| break |
| retryCount -= 1 |
| |
| asserts.assert_equal(0, ret, "cannot find any services with retries.") |
| |
| |
| def checkServiceQueryResult(serviceList, expectServiceList): |
| """Check serviceList same as expectServiceList or not |
| |
| Args: |
| serviceList: ServiceList which get from query result |
| expectServiceList: ServiceList which hardcode in genExpectTestData |
| Return: |
| True: serviceList same as expectServiceList |
| False:Exist discrepancy between serviceList and expectServiceList |
| """ |
| tempServiceList = serviceList.copy() |
| tempExpectServiceList = expectServiceList.copy() |
| for service in serviceList.keys(): |
| if service in expectServiceList: |
| del tempServiceList[service] |
| del tempExpectServiceList[service] |
| return len(tempExpectServiceList) == 0 and len(tempServiceList) == 0 |
| |
| |
| def genTestData(serviceCategory): |
| """Based on serviceCategory to generator Test Data |
| |
| Args: |
| serviceCategory: P2p local service type, Upnp or Bonjour |
| Return: |
| TestData |
| """ |
| testData = [] |
| if serviceCategory == p2pconsts.P2P_LOCAL_SERVICE_UPNP: |
| testData.append(p2pconsts.UpnpTestData.uuid) |
| testData.append(p2pconsts.UpnpTestData.serviceType) |
| testData.append([ |
| p2pconsts.UpnpTestData.AVTransport, |
| p2pconsts.UpnpTestData.ConnectionManager |
| ]) |
| elif serviceCategory == p2pconsts.P2P_LOCAL_SERVICE_IPP: |
| testData.append(p2pconsts.IppTestData.ippInstanceName) |
| testData.append(p2pconsts.IppTestData.ippRegistrationType) |
| testData.append(p2pconsts.IppTestData.ipp_txtRecord) |
| elif serviceCategory == p2pconsts.P2P_LOCAL_SERVICE_AFP: |
| testData.append(p2pconsts.AfpTestData.afpInstanceName) |
| testData.append(p2pconsts.AfpTestData.afpRegistrationType) |
| testData.append(p2pconsts.AfpTestData.afp_txtRecord) |
| |
| return testData |
| |
| |
| def genExpectTestData(serviceType, queryString1, queryString2): |
| """Based on serviceCategory to generator expect serviceList |
| |
| Args: |
| serviceType: P2p local service type, Upnp or Bonjour |
| queryString1: Query String, NonNull |
| queryString2: Query String, used for Bonjour, Nullable |
| Return: |
| expectServiceList |
| """ |
| expectServiceList = {} |
| if (serviceType == |
| WifiP2PEnums.WifiP2pServiceInfo.WIFI_P2P_SERVICE_TYPE_BONJOUR): |
| ipp_service = WifiP2PEnums.WifiP2pDnsSdServiceResponse() |
| afp_service = WifiP2PEnums.WifiP2pDnsSdServiceResponse() |
| if queryString1 == p2pconsts.IppTestData.ippRegistrationType: |
| if queryString2 == p2pconsts.IppTestData.ippInstanceName: |
| ipp_service.InstanceName = "" |
| ipp_service.RegistrationType = "" |
| ipp_service.FullDomainName = p2pconsts.IppTestData.ippDomainName |
| ipp_service.TxtRecordMap = p2pconsts.IppTestData.ipp_txtRecord |
| expectServiceList[ipp_service.toString()] = 1 |
| return expectServiceList |
| ipp_service.InstanceName = p2pconsts.IppTestData.ippInstanceName |
| ipp_service.RegistrationType = ( |
| p2pconsts.IppTestData.ippRegistrationType + ".local.") |
| ipp_service.FullDomainName = "" |
| ipp_service.TxtRecordMap = "" |
| expectServiceList[ipp_service.toString()] = 1 |
| return expectServiceList |
| elif queryString1 == p2pconsts.AfpTestData.afpRegistrationType: |
| if queryString2 == p2pconsts.AfpTestData.afpInstanceName: |
| afp_service.InstanceName = "" |
| afp_service.RegistrationType = "" |
| afp_service.FullDomainName = p2pconsts.AfpTestData.afpDomainName |
| afp_service.TxtRecordMap = p2pconsts.AfpTestData.afp_txtRecord |
| expectServiceList[afp_service.toString()] = 1 |
| return expectServiceList |
| ipp_service.InstanceName = p2pconsts.IppTestData.ippInstanceName |
| ipp_service.RegistrationType = ( |
| p2pconsts.IppTestData.ippRegistrationType + ".local.") |
| ipp_service.FullDomainName = "" |
| ipp_service.TxtRecordMap = "" |
| expectServiceList[ipp_service.toString()] = 1 |
| |
| ipp_service.InstanceName = "" |
| ipp_service.RegistrationType = "" |
| ipp_service.FullDomainName = p2pconsts.IppTestData.ippDomainName |
| ipp_service.TxtRecordMap = p2pconsts.IppTestData.ipp_txtRecord |
| expectServiceList[ipp_service.toString()] = 1 |
| |
| afp_service.InstanceName = p2pconsts.AfpTestData.afpInstanceName |
| afp_service.RegistrationType = ( |
| p2pconsts.AfpTestData.afpRegistrationType + ".local.") |
| afp_service.FullDomainName = "" |
| afp_service.TxtRecordMap = "" |
| expectServiceList[afp_service.toString()] = 1 |
| |
| afp_service.InstanceName = "" |
| afp_service.RegistrationType = "" |
| afp_service.FullDomainName = p2pconsts.AfpTestData.afpDomainName |
| afp_service.TxtRecordMap = p2pconsts.AfpTestData.afp_txtRecord |
| expectServiceList[afp_service.toString()] = 1 |
| |
| return expectServiceList |
| elif serviceType == WifiP2PEnums.WifiP2pServiceInfo.WIFI_P2P_SERVICE_TYPE_UPNP: |
| upnp_service = "uuid:" + p2pconsts.UpnpTestData.uuid + "::" + ( |
| p2pconsts.UpnpTestData.rootdevice) |
| expectServiceList[upnp_service] = 1 |
| if queryString1 != "upnp:rootdevice": |
| upnp_service = "uuid:" + p2pconsts.UpnpTestData.uuid + ( |
| "::" + p2pconsts.UpnpTestData.AVTransport) |
| expectServiceList[upnp_service] = 1 |
| upnp_service = "uuid:" + p2pconsts.UpnpTestData.uuid + ( |
| "::" + p2pconsts.UpnpTestData.ConnectionManager) |
| expectServiceList[upnp_service] = 1 |
| upnp_service = "uuid:" + p2pconsts.UpnpTestData.uuid + ( |
| "::" + p2pconsts.UpnpTestData.serviceType) |
| expectServiceList[upnp_service] = 1 |
| upnp_service = "uuid:" + p2pconsts.UpnpTestData.uuid |
| expectServiceList[upnp_service] = 1 |
| |
| return expectServiceList |
| |
| |
| def p2p_create_group(ad): |
| """Create a group as Group Owner |
| |
| Args: |
| ad: The android device |
| """ |
| ad.droid.wifiP2pCreateGroup() |
| ad.ed.pop_event(p2pconsts.CREATE_GROUP_SUCCESS_EVENT, |
| p2pconsts.DEFAULT_TIMEOUT) |
| time.sleep(p2pconsts.DEFAULT_SLEEPTIME) |
| |
| |
| def p2p_create_group_with_config(ad, network_name, passphrase, band): |
| """Create a group as Group Owner |
| |
| Args: |
| ad: The android device |
| """ |
| wifi_p2p_config = { |
| WifiP2PEnums.WifiP2pConfig.NETWORK_NAME: network_name, |
| WifiP2PEnums.WifiP2pConfig.PASSPHRASE: passphrase, |
| WifiP2PEnums.WifiP2pConfig.GROUP_BAND: band, |
| WifiP2PEnums.WifiP2pConfig.WPSINFO_KEY: { |
| WifiP2PEnums.WpsInfo.WPS_SETUP_KEY: |
| WifiP2PEnums.WpsInfo.WIFI_WPS_INFO_PBC |
| } |
| } |
| ad.droid.wifiP2pCreateGroupWithConfig(wifi_p2p_config) |
| ad.ed.pop_event(p2pconsts.CREATE_GROUP_SUCCESS_EVENT, |
| p2pconsts.DEFAULT_TIMEOUT) |
| time.sleep(p2pconsts.DEFAULT_SLEEPTIME) |
| |
| |
| def wifi_p2p_set_channels_for_current_group(ad, listening_chan, |
| operating_chan): |
| """Sets the listening channel and operating channel of the current group |
| created with initialize. |
| |
| Args: |
| ad: The android device |
| listening_chan: Integer, the listening channel |
| operating_chan: Integer, the operating channel |
| """ |
| ad.droid.wifiP2pSetChannelsForCurrentGroup(listening_chan, operating_chan) |
| ad.ed.pop_event(p2pconsts.SET_CHANNEL_SUCCESS_EVENT, |
| p2pconsts.DEFAULT_TIMEOUT) |
| |
| |
| class WifiP2PEnums(): |
| class WifiP2pConfig(): |
| DEVICEADDRESS_KEY = "deviceAddress" |
| WPSINFO_KEY = "wpsInfo" |
| GO_INTENT_KEY = "groupOwnerIntent" |
| NETID_KEY = "netId" |
| NETWORK_NAME = "networkName" |
| PASSPHRASE = "passphrase" |
| GROUP_BAND = "groupOwnerBand" |
| |
| class WpsInfo(): |
| WPS_SETUP_KEY = "setup" |
| BSSID_KEY = "BSSID" |
| WPS_PIN_KEY = "pin" |
| #TODO: remove it from wifi_test_utils.py |
| WIFI_WPS_INFO_PBC = 0 |
| WIFI_WPS_INFO_DISPLAY = 1 |
| WIFI_WPS_INFO_KEYPAD = 2 |
| WIFI_WPS_INFO_LABEL = 3 |
| WIFI_WPS_INFO_INVALID = 4 |
| |
| class WifiP2pServiceInfo(): |
| #TODO: remove it from wifi_test_utils.py |
| # Macros for wifi p2p. |
| WIFI_P2P_SERVICE_TYPE_ALL = 0 |
| WIFI_P2P_SERVICE_TYPE_BONJOUR = 1 |
| WIFI_P2P_SERVICE_TYPE_UPNP = 2 |
| WIFI_P2P_SERVICE_TYPE_VENDOR_SPECIFIC = 255 |
| |
| class WifiP2pDnsSdServiceResponse(): |
| def __init__(self): |
| pass |
| |
| InstanceName = "" |
| RegistrationType = "" |
| FullDomainName = "" |
| TxtRecordMap = {} |
| |
| def toString(self): |
| return self.InstanceName + self.RegistrationType + ( |
| self.FullDomainName + str(self.TxtRecordMap)) |