pro3_control_panel/base/floader_handler.py
2025-12-15 09:23:52 +08:00

647 lines
28 KiB
Python

#! /usr/bin/env python
# -*- coding: utf-8 -*-
# Copyright (c) 2024 Realtek Semiconductor Corp.
# SPDX-License-Identifier: Apache-2.0
import time
import ctypes
from .sense_status import *
from .device_info import *
from .next_op import *
from .flash_utils import *
BAUDSET = 0x81
QUERY = 0x02
CONFIG = 0x83
WRITE = 0x84
READ = 0x05
CHKSM = 0x06
SENSE = 0x07
NEXTOP = 0x08
FS_ERASE = 0xA0
FS_RDSTS = 0x21
FS_WTSTS = 0xA2
FS_MKBAD = 0xA3
FS_CHKMAP = 0x24
FS_CHKBAD = 0x25
FS_CHKBLK = 0x26
OTP_RRAW = 0x40
OTP_WRAW = 0xC1
OTP_RMAP = 0x42
OTP_WMAP = 0xC3
ACK_BUF_EMPTY = 0xB0
ACK_BUF_FULL = 0xB1
NP_EXIT = 0x01
NP_BAUDRATE_RECOVER = 0x02
SOF = 0xA5
QUERY_DATA_OFFSET_DID = 0
QUERY_DATA_OFFSET_IMAGE_TYPE = 2
QUERY_DATA_OFFSET_CMD_SET_VERSION = 4
QUERY_DATA_OFFSET_MEMORY_TYPE = 6
QUERY_DATA_OFFSET_FLASH_MID = 7
QUERY_DATA_OFFSET_FLASH_DID = 8
QUERY_DATA_OFFSET_FLASH_MFG = 10
QUERY_DATA_OFFSET_FLASH_MODEL = 22
QUERY_DATA_OFFSET_FLASH_PAGE_SIZE = 42
QUERY_DATA_OFFSET_FLASH_OOB_SIZE = 46
QUERY_DATA_OFFSET_FLASH_PAGES_PER_BLOCK = 48
QUERY_DATA_OFFSET_FLASH_BLOCKS_PER_LUN = 52
QUERY_DATA_OFFSET_FLASH_LUNS_PER_TARGET = 56
QUERY_DATA_OFFSET_FLASH_MAX_BAD_BLOCKS_PER_LUN = 57
QUERY_DATA_OFFSET_FLASH_REQ_HOST_ECC_LEVEL = 59
QUERY_DATA_OFFSET_FLASH_TARGETS = 60
QUERY_DATA_OFFSET_FLASH_CAPACITY = 61
QUERY_DATA_OFFSET_WIFI_MAC = 65
# Read otp logical map time:26ms, physical map time: 170ms
# Program otp logical map time: 7ms
OTP_READ_TIMEOUT_IN_SECONDS = 10
class FloaderHandler(object):
def __init__(self, ameba_obj):
self.ameba = ameba_obj
self.serial_port = ameba_obj.serial_port
self.profile = ameba_obj.profile_info
self.logger = ameba_obj.logger
self.setting = ameba_obj.setting
super().__init__()
def send_request(self, request, length, timeout, is_sync=True):
ret = ErrType.SYS_UNKNOWN
response_bytes = None
len_l = length & 0xFF
len_h = (length >> 8) & 0xFF
frame_data = [(SOF)]
frame_data.append(len_l)
frame_data.append(len_h)
frame_data.append((len_l ^ len_h) & 0xFF)
frame_bytes = bytearray(frame_data)
frame_bytes += request[:length]
checksum = sum(request)
frame_bytes += (checksum & 0xFF).to_bytes(1, byteorder="little")
try:
retry = 0
while retry < self.setting.request_retry_count:
retry += 1
self.serial_port.flushInput()
self.serial_port.flushOutput()
self.ameba.write_bytes(frame_bytes)
self.logger.debug(f"Request: len={length}, payload={request.hex()}")
ret, ret_byte = self.ameba.read_bytes(timeout)
if ret != ErrType.OK:
self.logger.error(f"Response error: {ret}, timeout:{timeout}")
continue
if is_sync:
if ret_byte[0] == SOF:
ret, ret_bytes = self.ameba.read_bytes(timeout, size=3)
if ret == ErrType.OK:
len_l = ret_bytes[0]
len_h = ret_bytes[1]
len_xor = ret_bytes[2]
response_len = (len_h << 8) + len_l
if len_xor == (len_l ^ len_h):
ret, response_bytes = self.ameba.read_bytes(self.setting.async_response_timeout_in_second,
size=response_len + 1)
if ret == ErrType.OK:
if response_len >= len(response_bytes) - 1:
self.logger.debug(
f"Response: len={response_len}, payload={response_bytes.hex()}")
checksm = sum(response_bytes[:response_len]) % 256
if checksm == response_bytes[response_len]:
self.logger.debug(f"Checksum={checksm}({hex(checksm)}), ok")
break
else:
self.logger.debug(
f"Response checksum error: expect {hex(checksm)}, get {hex(response_bytes[response_len + 1])}")
ret = ErrType.SYS_CHECKSUM
else:
ret = ErrType.SYS_PROTO
self.logger.debug(
f"Response length error: expect {len_xor}, get {len_l ^ len_h}")
else:
self.logger.debug(f"Read response payload error: {ret}")
else:
ret = ErrType.SYS_PROTO
self.logger.debug(f"Response length check error: {ret_bytes.hex()}")
else:
ret = ErrType.SYS_PROTO
self.logger.error(f"Read response length error: {ret}")
elif ret_byte[0] >= ErrType.DEV_ERR_BASE.value:
ret = ret_byte
self.logger.debug(f"Negative response 0x{ret_byte.hex()}: ")
if ret_byte[0] == ErrType.DEV_FULL.value:
time.sleep(self.setting.request_retry_interval_second)
else:
ret = ErrType.SYS_PROTO
self.logger.debug(f"Unexpected response opcode {ret_byte.hex()}")
else:
if ret_byte[0] == ACK_BUF_FULL:
self.logger.debug(f"ACK: Rx buffer full, wait {self.setting.request_retry_interval_second}s")
time.sleep(self.setting.request_retry_interval_second)
ret = ErrType.OK
elif ret_byte[0] == ACK_BUF_EMPTY:
self.logger.debug(f"Response: ACK")
ret = ErrType.OK
break
elif ret_byte[0] >= ErrType.DEV_ERR_BASE.value:
ret = ret_byte
self.logger.debug(f"Negative response: {ret_byte}")
if ret_byte[0] == ErrType.DEV_FULL.value or ErrType.DEV_BUSY.value:
time.sleep(self.setting.request_retry_interval_second)
else:
ret = ErrType.SYS_PROTO
self.logger.debug(f"Unexpected response {ret_byte}")
if ret == ErrType.OK or ret == ErrType.SYS_CANCEL:
break
else:
time.sleep(self.setting.request_retry_interval_second)
except Exception as err:
self.logger.debug(f"Response exception: {err}")
ret = ErrType.SYS_IO
return ret, response_bytes
def sense(self, timeout, op_code=None, data=None):
self.logger.debug(f"Sense...")
ret, sense_ack = self.send_request(SENSE.to_bytes(1, byteorder="little"), length=1, timeout=timeout)
if ret == ErrType.OK:
sense_status = SenseStatus()
self.logger.debug(f"Sense response raw data: {sense_ack.hex()}")
if sense_ack[0] == (SENSE):
ret = sense_status.parse(sense_ack, 1)
if ret == ErrType.OK:
self.logger.debug(
f"Sense response: opcode={hex(sense_status.op_code)}, status=0x{format(sense_status.status, '02x')}, data={hex(sense_status.data)}")
if sense_status.status != ErrType.OK.value:
ret = sense_status.status
self.logger.warning(
f"Sense fail: opcode={hex(sense_status.op_code)}, data={sense_status.data}, status={ret}")
elif (op_code is not None) and (sense_status.op_code != op_code):
ret = ErrType.SYS_PROTO
self.logger.error(
f"Sense protocol error: expect opcode-data {op_code.hex()}-{hex(data)}, get {hex(sense_status.op_code)}-{hex(sense_status.data)}")
else:
if (data is not None) and sense_status.data != data:
self.logger.debug(
f"Sense protocol warning: opcode {op_code} expect data {data}, get {sense_status.data}, ignored")
self.logger.debug("Sense ok")
else:
self.logger.debug(f"Sense fail to parse sense response")
else:
self.logger.debug(f"Sense fail: unexpected opcode {sense_ack[0]}")
else:
self.logger.debug(f"Sense fail: {ret}")
return ret, sense_ack
def handshake(self, baudrate):
ret = ErrType.SYS_UNKNOWN
self.logger.debug(f"Floader handshake start")
try:
if self.serial_port.baudrate != baudrate and self.setting.switch_baudrate_at_floader == 1:
baudset = [BAUDSET]
baudset.extend(list(baudrate.to_bytes(4, byteorder="little")))
self.logger.debug(f"BAUDSET {baudrate}")
baudset_bytes = bytearray(baudset)
ret, _ = self.send_request(baudset_bytes, len(baudset_bytes), self.setting.async_response_timeout_in_second,
is_sync=False)
if ret != ErrType.OK:
self.logger.debug(f"BAUDSET fail: {ret}")
return ret
ret = self.ameba.switch_baudrate(baudrate, self.setting.baudrate_switch_delay_in_second, True)
if ret != ErrType.OK:
self.logger.error(f"Fail to switch baudrate to {baudrate}: {ret}")
return ret
for retry in range(3):
if retry > 0:
self.logger.debug(f"Sense retry {retry}#")
else:
self.logger.debug("Sense")
ret, resp = self.sense(self.setting.sync_response_timeout_in_second)
if ret == ErrType.OK:
break
else:
self.logger.debug(f"Sense failed: {ret}")
time.sleep(self.setting.request_retry_interval_second)
continue
if ret == ErrType.OK:
self.logger.debug(f"Floader handshake done")
else:
self.logger.error(f"Floader handshake timeout")
except Exception as err:
self.logger.error(f"Floader handshake exception: {err}")
return ret
def query(self):
device_info = DeviceInfo()
self.logger.debug(f"QUERY...")
ret, resp = self.send_request(QUERY.to_bytes(1, byteorder="little"), length=1,
timeout=self.setting.sync_response_timeout_in_second)
if ret == ErrType.OK:
if resp[0] == (QUERY):
device_info.did = resp[QUERY_DATA_OFFSET_DID + 1] + (resp[QUERY_DATA_OFFSET_DID + 2] << 8)
device_info.image_type = resp[QUERY_DATA_OFFSET_IMAGE_TYPE + 1] + (
resp[QUERY_DATA_OFFSET_IMAGE_TYPE + 2] << 8)
device_info.cmd_set_version = resp[QUERY_DATA_OFFSET_CMD_SET_VERSION + 1] + (
resp[QUERY_DATA_OFFSET_CMD_SET_VERSION + 2] << 8)
device_info.wifi_mac = resp[QUERY_DATA_OFFSET_WIFI_MAC + 1: QUERY_DATA_OFFSET_WIFI_MAC + 1 + 6]
device_info.memory_type = resp[QUERY_DATA_OFFSET_MEMORY_TYPE + 1]
device_info.flash_mid = resp[QUERY_DATA_OFFSET_FLASH_MID + 1]
device_info.flash_did = resp[QUERY_DATA_OFFSET_FLASH_DID + 1] + (
resp[QUERY_DATA_OFFSET_FLASH_DID + 2] << 8)
device_info.flash_mfg = resp[
QUERY_DATA_OFFSET_FLASH_MFG + 1: QUERY_DATA_OFFSET_FLASH_MFG + 1 + 12].decode("utf-8")
device_info.flash_model = resp[
QUERY_DATA_OFFSET_FLASH_MODEL + 1: QUERY_DATA_OFFSET_FLASH_MODEL + 1 + 20].decode("utf-8")
device_info.flash_page_size = (resp[QUERY_DATA_OFFSET_FLASH_PAGE_SIZE + 1]
+ (resp[QUERY_DATA_OFFSET_FLASH_PAGE_SIZE + 2] << 8)
+ (resp[QUERY_DATA_OFFSET_FLASH_PAGE_SIZE + 3] << 16)
+ (resp[QUERY_DATA_OFFSET_FLASH_PAGE_SIZE + 4] << 24))
device_info.flash_oob_size = (resp[QUERY_DATA_OFFSET_FLASH_OOB_SIZE + 1] + (
resp[QUERY_DATA_OFFSET_FLASH_OOB_SIZE + 2] << 8))
device_info.flash_pages_per_block = (resp[QUERY_DATA_OFFSET_FLASH_PAGES_PER_BLOCK + 1]
+ (resp[QUERY_DATA_OFFSET_FLASH_PAGES_PER_BLOCK + 2] << 8)
+ (resp[QUERY_DATA_OFFSET_FLASH_PAGES_PER_BLOCK + 3] << 16)
+ (resp[QUERY_DATA_OFFSET_FLASH_PAGES_PER_BLOCK + 4] << 24))
device_info.flash_blocks_per_lun = (resp[QUERY_DATA_OFFSET_FLASH_BLOCKS_PER_LUN + 1]
+ (resp[QUERY_DATA_OFFSET_FLASH_BLOCKS_PER_LUN + 2] << 8)
+ (resp[QUERY_DATA_OFFSET_FLASH_BLOCKS_PER_LUN + 3] << 16)
+ (resp[QUERY_DATA_OFFSET_FLASH_BLOCKS_PER_LUN + 4] << 24))
device_info.flash_luns_per_target = resp[QUERY_DATA_OFFSET_FLASH_LUNS_PER_TARGET + 1]
device_info.flash_max_bad_block_per_lun = (resp[QUERY_DATA_OFFSET_FLASH_MAX_BAD_BLOCKS_PER_LUN + 1]
+ (resp[
QUERY_DATA_OFFSET_FLASH_MAX_BAD_BLOCKS_PER_LUN + 2] << 8))
device_info.flash_req_host_ecc_level = resp[QUERY_DATA_OFFSET_FLASH_REQ_HOST_ECC_LEVEL + 1]
device_info.flash_targets = resp[QUERY_DATA_OFFSET_FLASH_TARGETS + 1]
device_info.flash_capacity = (resp[QUERY_DATA_OFFSET_FLASH_CAPACITY + 1]
+ (resp[QUERY_DATA_OFFSET_FLASH_CAPACITY + 2] << 8)
+ (resp[QUERY_DATA_OFFSET_FLASH_CAPACITY + 3] << 16)
+ (resp[QUERY_DATA_OFFSET_FLASH_CAPACITY + 4] << 24))
else:
self.logger.debug(f"Query: unexpected response: {resp[0]}")
ret = ErrType.SYS_PROTO
return ret, device_info
def config(self, configs):
config_data = [(CONFIG)]
config_data.append(configs[0][0])
config_data.append(configs[0][1])
config_data.append(configs[0][2])
config_data.append(configs[0][3])
config_data.append(configs[0][4])
config_data.append(configs[0][5])
config_data.append(configs[0][6])
config_data.append(configs[0][7])
config_data.append(configs[1][0])
config_data.append(configs[1][1])
config_data.append(configs[1][2])
config_data.append(configs[1][3])
config_data.append(configs[1][4])
config_data.append(configs[1][5])
config_data.append(configs[1][6])
config_data.append(configs[1][7])
request_bytes = bytearray(config_data)
self.logger.debug(f"CONFIG: {configs[0].hex()} {configs[1].hex()}")
ret, _ = self.send_request(request_bytes, len(request_bytes), self.setting.async_response_timeout_in_second, is_sync=False)
return ret
def next_operation(self, opcode, operand):
request_data = [NEXTOP]
request_data.append((opcode.value) & 0xFF)
request_data.extend(list(operand.to_bytes(4, byteorder="little")))
self.logger.debug(f"NEXTOTP: opecode={opcode}, operand={operand}")
request_bytes = bytearray(request_data)
ret, _ = self.send_request(request_bytes, len(request_bytes), self.setting.sync_response_timeout_in_second, is_sync=False)
return ret
def reset_in_download_mode(self):
self.logger.debug(f"Reset in download mode")
return self.next_operation(NextOpType.REBURN, 0)
def write(self, mem_type, src, size, addr, timeout, need_sense=False):
sense_status = SenseStatus()
write_data = [WRITE]
write_data.append(mem_type&0xFF)
write_data.extend(list(addr.to_bytes(4, byteorder="little")))
write_array = bytearray(write_data)
write_array += src[:size]
self.logger.debug(f"WRITE: addr={hex(addr)}, size={size}, mem_type={mem_type}, need_sense={need_sense}")
ret, _ = self.send_request(write_array, len(write_array), self.setting.write_response_timeout_in_second, is_sync=False)
if ret == ErrType.OK:
if need_sense:
ret, sense_ack = self.sense(timeout, op_code=WRITE, data=addr)
if ret != ErrType.OK:
self.logger.error(f"WRITE addr={hex(addr)} fail: {ret}")
return ret
def read(self, mem_type, addr, size, timeout):
resp = None
read_data = [READ]
read_data.append((mem_type & 0xFF))
read_data.extend(list(addr.to_bytes(4, byteorder="little")))
read_data.extend(list(size.to_bytes(4, byteorder="little")))
self.logger.debug(f"READ: addr={hex(addr)}, size={size}, mem_type={mem_type}")
read_bytes = bytearray(read_data)
ret, resp_ack = self.send_request(read_bytes, len(read_bytes), timeout)
if ret == ErrType.OK:
if resp_ack[0] == READ:
resp = resp_ack[1:]
else:
self.logger.debug(f"READ got unexpected response {hex(resp_ack[0])}")
ret = ErrType.SYS_PROTO
else:
self.logger.debug(f"READ fail: {ret}")
return ret
def checksum(self, mem_type, start_addr, end_addr, size, timeout):
chk_rest = 0
request_data = [CHKSM]
request_data.append((mem_type & 0xFF))
request_data.extend(list(start_addr.to_bytes(4, byteorder='little')))
request_data.extend(list(end_addr.to_bytes(4, byteorder='little')))
request_data.extend(list(size.to_bytes(4, byteorder='little')))
self.logger.debug(f"CHKSM: start={hex(start_addr)}, end={hex(end_addr)}, size={size}, mem_type={mem_type}")
request_bytes = bytearray(request_data)
ret, resp = self.send_request(request_bytes, len(request_bytes), timeout)
if ret == ErrType.OK:
if resp[0] == int(CHKSM):
chk_rest = resp[1] + (resp[2] << 8) + (resp[3] << 16) + (resp[4] << 24)
self.logger.debug(f"CHKSM: result={hex(chk_rest)}")
else:
self.logger.debug(f"CHKSM: unexpected response {resp[0]}")
ret = ErrType.SYS_PROTO
else:
self.logger.error(f"CHKSM fail: {ret}")
return ret, chk_rest
def erase_flash(self, mem_type, start_addr, end_addr, size, timeout, sense=False, force=False):
self.logger.debug(f"Erase flash: start_addr={hex(start_addr)}, end_addr={hex(end_addr)} size={size}")
request_data = [FS_ERASE]
request_data.append((mem_type & 0xFF))
request_data.append(1 if force else 0)
request_data.extend(list(start_addr.to_bytes(4, byteorder="little")))
request_data.extend(list((end_addr & 0xFFFFFFFF).to_bytes(4, byteorder="little")))
request_data.extend(list((size & 0xFFFFFFFF).to_bytes(4, byteorder="little")))
if force:
self.logger.warning(f"FS_ERASE: start_addr={hex(start_addr)}, end_addr={hex(end_addr)}, size={size}, mem_type={mem_type} force")
else:
self.logger.debug(f"FS_ERASE: start_addr={hex(start_addr)}, end_addr={hex(end_addr)}, size={size}, mem_type={mem_type}")
request_bytes = bytearray(request_data)
ret, _ = self.send_request(request_bytes, len(request_bytes), self.setting.async_response_timeout_in_second, is_sync=False)
if ret != ErrType.OK:
self.logger.warning(f"FS_ERASE start_addr={hex(start_addr)}, end_addr={hex(end_addr)}, size={size}, force={force}, fail:{ret}")
return ret
if sense:
ret, sense_status = self.sense(timeout, op_code=FS_ERASE, data=start_addr)
if ret != ErrType.OK:
self.logger.error(f"FS_ERASE start_addr={hex(start_addr)}, size={size} force={force} fail: {ret}")
return ret
def read_status_register(self, cmd, address):
status = None
request_data = [FS_RDSTS]
request_data.append(cmd)
request_data.append(address)
self.logger.debug(f"FS_RDSTS: cmd={cmd}, address={address}")
request_bytes = bytearray(request_data)
ret, resp = self.send_request(request_bytes, len(request_bytes), self.setting.sync_response_timeout_in_second)
if ret == ErrType.OK:
if resp[0] == FS_RDSTS:
status = resp[1]
self.logger.debug(f"FS_RDSTS: status={status}")
else:
self.logger.debug(f"FS_RDSTS: got unexpected response, {resp[0]}")
ret = ErrType.SYS_PROTO
else:
self.logger.debug(f"FS_RDSTS failed: {ret}")
return ret, status
def write_status_register(self, cmd, addr, value):
request_data = [FS_WTSTS]
request_data.append(cmd)
request_data.append(addr)
request_data.append(value)
self.logger.debug(f"FS_WTSTS: cmd={hex(cmd)}, addr={hex(addr)}, value={hex(value)}")
request_bytes = bytearray(request_data)
ret, _ = self.send_request(request_bytes, len(request_bytes), self.setting.async_response_timeout_in_second, is_sync=False)
return ret
def mark_bad_block(self, address):
request_data = [FS_MKBAD]
request_data.extend(list(address.to_bytes(4, byteorder="little")))
request_bytes = bytearray(request_data)
self.logger.debug(f"FS_MKBAD: addr={format(address, '08x')}")
return self.send_request(request_bytes, len(request_bytes), self.setting.async_response_timeout_in_second, is_sync=False)
def check_bad_block(self, address):
ret = ErrType.OK
status = None
request_data = [FS_CHKBAD]
request_data.extend(list(address.to_bytes(4, byteorder="little")))
request_bytes = bytearray(request_data)
self.logger.debug(f"FS_CHKBAD: addr={format(address, '08x')}")
ret, resp = self.send_request(request_bytes, len(request_bytes), self.setting.sync_response_timeout_in_second)
if ret == ErrType.OK:
if resp[0] == FS_CHKBAD:
status = resp[1]
else:
self.logger.debug(f"FS_CHKBAD got unexpected response {hex(resp[0])}")
ret = ErrType.SYS_PROTO
else:
self.logger.debug(f"FS_CHKBAD fail: {ret}")
return ret, status
def check_block_status(self, address):
ret = ErrType.OK
block_status = 0
page_status = [0, 0]
request_data = [FS_CHKBLK]
request_data.extend(list(address.to_bytes(4, byteorder="little")))
request_bytes = bytearray(request_data)
self.logger.debug(f"FS_CHKBLK: addr={format(address, '08x')}")
ret, resp = self.send_request(request_bytes, len(request_bytes), self.setting.sync_response_timeout_in_second)
if ret == ErrType.OK:
if resp[0] == FS_CHKBLK:
block_status = resp[1]
page_status[0] = ctypes.c_uint64(resp[2] + (resp[3] << 8) + (resp[4] << 16) +
(resp[5] << 24) + (resp[6] << 32) + (resp[7] << 40) +
(resp[8] << 48) + (resp[9] << 56)).value
page_status[1] = ctypes.c_uint64(resp[10] + (resp[11] << 8) + (resp[12] << 16) +
(resp[13] << 24) + (resp[14] << 32) + (resp[15] << 40) +
(resp[16] << 48) + (resp[17] << 56)).value
self.logger.debug(
f"FS_CHKBLK: block_status={hex(block_status)}, page status={format(page_status[0], '16x')} {format(page_status[1], '16x')}")
else:
self.logger.debug(f"FS_CHKBLK fail: {ret}")
return ret, block_status
def check_map_status(self, address):
ret = ErrType.OK
status = 0
request_data = [FS_CHKMAP]
request_data.extend(list(address.to_bytes(4, byteorder="little")))
request_bytes = bytearray(request_data)
self.logger.debug(f"FS_CHKMAP: addr={format(address, '08x')}")
ret, resp = self.send_request(request_bytes, len(request_bytes), self.setting.sync_response_timeout_in_second)
if ret == ErrType.OK:
if resp[0] == FS_CHKMAP:
status = (resp[1] + (resp[2] << 8) + (resp[3] << 16) + (resp[4] << 24)) & 0xFFFFFFFF
self.logger.debug(f"FS_CHKMAP: status={format(status, '08x')}")
else:
self.logger.debug(f"FS_CHKMAP fail: {ret}")
return ret, status
def otp_read_map(self, cmd, address, size):
request_data = [cmd]
request_data.extend(list(address.to_bytes(4, byteorder="little")))
request_data.extend(list(size.to_bytes(4, byteorder="little")))
request_bytes = bytearray(request_data)
ret, buf = self.send_request(request_bytes, len(request_bytes), OTP_READ_TIMEOUT_IN_SECONDS)
if ret == ErrType.OK:
if buf[0] == cmd:
self.logger.debug(f"Otp read: {buf[0]}")
return ret, buf[1:-1]
else:
self.logger.debug(f"Otp read fail: unexpected response {buf[0]}")
ret = ErrType.SYS_PROTO
return ret, buf
def otp_write_map(self, cmd, address, size, data):
request_data = [cmd]
request_data.extend(list(address.to_bytes(4, byteorder="little")))
request_data.extend(list(size.to_bytes(4, byteorder="little")))
for idx in range(size):
request_data.append(data[address + idx])
request_bytes = bytearray(request_data)
ret, _ = self.send_request(request_bytes, len(request_bytes), self.setting.async_response_timeout_in_second, is_sync=False)
return ret
def otp_read_physical_map(self, address, size):
self.logger.debug(f"OTP_RRAW: addr={hex(address)}, size={size}")
ret, buf = self.otp_read_map(OTP_RRAW, address, size)
if ret == ErrType.OK:
self.logger.debug(f"OTP_RRAW: {buf}")
self.logger.debug("OTP_RRAW done")
else:
self.logger.debug(f"OTP_RRAW fail: {ret}")
return ret, buf
def otp_write_physical_map(self, address, size, data):
self.logger.debug(f"OTP_WRAW: addr={hex(address)}, size={size}, data={data}")
ret = self.otp_write_map(OTP_WMAP, address, size, data)
if ret == ErrType.OK:
self.logger.debug("OTP_WRAW done")
else:
self.logger.debug(f"OTP_WRAW fail: {ret}")
return ret
def otp_read_logical_map(self, address, size):
self.logger.debug(f"OTP_RMAP: addr={hex(address)}, size={size}")
ret, buf = self.otp_read_map(OTP_RMAP, address, size)
if ret == ErrType.OK:
self.logger.debug(f"OTP_RMAP: {buf}")
self.logger.debug("OTP_RMAP done")
else:
self.logger.debug(f"OTP_RMAP fail: {ret}")
return ret, buf
def otp_write_logical_map(self, address, size, data):
self.logger.debug(f"OTP_WMAP: addr={hex(address)}, size={size}, data={data}")
ret = self.otp_write_map(OTP_WMAP, address, size, data)
if ret == ErrType.OK:
self.logger.debug("OTP_WMAP done")
else:
self.logger.debug(f"OTP_WMAP fail: {ret}")
return ret