169 lines
		
	
	
		
			5.9 KiB
		
	
	
	
		
			Python
		
	
	
		
			Executable File
		
	
	
	
	
			
		
		
	
	
			169 lines
		
	
	
		
			5.9 KiB
		
	
	
	
		
			Python
		
	
	
		
			Executable File
		
	
	
	
	
| #!/usr/bin/env python3
 | |
| import subprocess
 | |
| import argparse
 | |
| import time
 | |
| import sys
 | |
| 
 | |
| def run_ssh_command(ip, command, password="test0000", timeout=300, retries=3):
 | |
|     """Execute an SSH command using sshpass with retries and timeout."""
 | |
|     ssh_cmd = f'sshpass -p {password} ssh -o StrictHostKeyChecking=no root@{ip} "{command}"'
 | |
|     for attempt in range(1, retries + 1):
 | |
|         try:
 | |
|             result = subprocess.run(
 | |
|                 ssh_cmd,
 | |
|                 shell=True,
 | |
|                 capture_output=True,
 | |
|                 text=True,
 | |
|                 check=True,
 | |
|                 timeout=timeout
 | |
|             )
 | |
|             return result.stdout.strip(), result.stderr.strip()
 | |
|         except subprocess.TimeoutExpired:
 | |
|             print(f"SSH command timed out after {timeout} seconds (attempt {attempt}/{retries}): {command}")
 | |
|             if attempt == retries:
 | |
|                 return None, "Timeout expired"
 | |
|         except subprocess.CalledProcessError as e:
 | |
|             print(f"SSH command error (attempt {attempt}/{retries}): {e}, stderr: {e.stderr}")
 | |
|             if attempt == retries:
 | |
|                 return None, e.stderr
 | |
|         time.sleep(5)  # Wait before retrying
 | |
|     return None, "Failed after retries"
 | |
| 
 | |
| 
 | |
| def get_system_info(ip):
 | |
|     """Retrieve system information using crossystem and cat /etc/lsb-release."""
 | |
|     print(f"\nCollecting system information for {ip}...")
 | |
|     
 | |
|     # Run crossystem
 | |
|     print("\n--- crossystem output ---")
 | |
|     stdout, stderr = run_ssh_command(ip, "crossystem")
 | |
|     if stdout:
 | |
|         print(stdout)
 | |
|     else:
 | |
|         print(f"Failed to retrieve crossystem info: {stderr}")
 | |
|     
 | |
|     # Run cat /etc/lsb-release
 | |
|     print("\n--- /etc/lsb-release output ---")
 | |
|     stdout, stderr = run_ssh_command(ip, "cat /etc/lsb-release")
 | |
|     if stdout:
 | |
|         print(stdout)
 | |
|     else:
 | |
|         print(f"Failed to retrieve /etc/lsb-release: {stderr}")
 | |
|     print("\n")
 | |
| 
 | |
| 
 | |
| def get_last_eventlog_line(ip):
 | |
|     """Retrieve the last line of /var/log/eventlog.txt."""
 | |
|     command = "tail -n 1 /var/log/eventlog.txt"
 | |
|     stdout, stderr = run_ssh_command(ip, command)
 | |
|     return stdout if stdout else None
 | |
| 
 | |
| def run_firmware_update(ip, timeout=300):
 | |
|     """Run the firmware update command and check for success."""
 | |
|     command = "/usr/local/chromeos-firmwareupdate_joxer_r132 -m factory"
 | |
|     print(f"Firmware update command: {command}")
 | |
|     stdout, stderr = run_ssh_command(ip, command, timeout=timeout)
 | |
|     # Print stdout and stderr
 | |
|     print(f"\n--- stdout ---\n{stdout if stdout else 'No stdout'}")
 | |
|     print(f"\n--- stderr ---\n{stderr if stderr else 'No stderr'}")
 | |
| 
 | |
|     if stdout is None:
 | |
|         print(f"Firmware update failed: No output received (timeout or error).")
 | |
|         return False
 | |
|     if "Firmware updater exits successfully" not in stdout:
 | |
|         print(f"Firmware update failed: Success message not found. Output: {stdout}, Error: {stderr}")
 | |
|         return False
 | |
|     return True
 | |
| 
 | |
| def reboot_device(ip):
 | |
|     """Send reboot command to the device and verify it's back online using ping."""
 | |
|     command = "reboot"
 | |
|     run_ssh_command(ip, command)
 | |
|     print(f"Reboot command sent to {ip}. Waiting for device to restart...")
 | |
| 
 | |
|     max_attempts = 3
 | |
|     wait_time = 60  # Total wait time in seconds
 | |
|     ping_interval = 5  # Seconds between ping attempts
 | |
|     time.sleep(wait_time) 
 | |
| 
 | |
|     for attempt in range(1, max_attempts + 1):
 | |
|         print(f"Ping attempt {attempt}/{max_attempts}...")
 | |
|         start_time = time.time()
 | |
|         while time.time() - start_time < wait_time:
 | |
|             try:
 | |
|                 # Attempt to ping the device
 | |
|                 subprocess.run(
 | |
|                     f"ping -c 1 {ip}",
 | |
|                     shell=True,
 | |
|                     capture_output=True,
 | |
|                     text=True,
 | |
|                     check=True,
 | |
|                 )
 | |
|                 print(f"Device {ip} is back online.")
 | |
|                 return True
 | |
|             except subprocess.CalledProcessError:
 | |
|                 time.sleep(ping_interval)  # Wait before next ping
 | |
|         print(f"Ping attempt {attempt} failed.")
 | |
|     
 | |
|     print(f"Device {ip} did not respond to ping after {max_attempts} attempts.")
 | |
|     return False
 | |
| 
 | |
| def main():
 | |
|     # Parse command-line arguments
 | |
|     parser = argparse.ArgumentParser(description="Stress test script for firmware update")
 | |
|     parser.add_argument("--remote", required=True, help="IP address of the DUT")
 | |
|     parser.add_argument("--loops", type=int, default=30, help="Number of test loops")
 | |
|     args = parser.parse_args()
 | |
| 
 | |
|     ip = args.remote
 | |
|     loops = args.loops
 | |
|     failures = 0
 | |
|     get_system_info(ip)
 | |
| 
 | |
|     for i in range(1, loops + 1):
 | |
|         print(f"\nStarting loop {i}/{loops}...")
 | |
| 
 | |
|         # Step 1: Get initial last line of eventlog
 | |
|         initial_log = get_last_eventlog_line(ip)
 | |
|         if not initial_log:
 | |
|             print("Failed to retrieve initial event log. Aborting loop.")
 | |
|             failures += 1
 | |
|             return
 | |
|         print(f"Initial event log line: {initial_log}")
 | |
| 
 | |
|         # Step 2: Run firmware update
 | |
|         print("Running firmware update...")
 | |
|         if not run_firmware_update(ip):
 | |
|             print("Firmware update failed. Aborting loop.")
 | |
|             failures += 1
 | |
|             return False
 | |
| 
 | |
|         # Step 4: Check if event log changed
 | |
|         print("Checking event log before reboot...")
 | |
|         time.sleep(10) 
 | |
| 
 | |
|         final_log = get_last_eventlog_line(ip)
 | |
|         if not final_log:
 | |
|             print("Failed to retrieve final event log. Aborting loop.")
 | |
|             failures += 1
 | |
|             return False
 | |
|         print(f"Final event log line: {final_log}")
 | |
| 
 | |
|         if final_log == initial_log:
 | |
|             print("Event log did not change after firmware update!")
 | |
|             failures += 1
 | |
|         else:
 | |
|             print(f"Event log updated. New line: {final_log}")
 | |
|             return False
 | |
| 
 | |
|         # Step 3: Reboot device
 | |
|         if True != reboot_device(ip):
 | |
|             return False
 | |
| 
 | |
|     # Summary
 | |
|     print(f"\nTest completed. Total loops: {loops}, Failures: {failures}")
 | |
|     if failures > 0:
 | |
|         sys.exit(1)
 | |
| 
 | |
| if __name__ == "__main__":
 | |
|     main() | 
