#!/usr/bin/env python3
"""
MA5671A SFP GPON Firmware Düzenleyici - Windows Uyumlu Versiyon
Fake O5 sorununun çözümü için firmware düzenleme işlemini otomatikleştirir.
Windows, Linux ve macOS destekli.
"""
import os
import subprocess
import sys
import shutil
import tempfile
import platform
import urllib.request
import zipfile
from pathlib import Path
import json
import logging
# Logging ayarları
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
class FirmwareEditor:
def __init__(self, firmware_path):
self.firmware_path = Path(firmware_path)
self.work_dir = None
self.squashfs_root = None
self.system = platform.system().lower()
self.tools_dir = None
# Varsayılan değerler
self.default_values = {
'HWTC': 'HWTC',
'6BA1896SPE2C05': '6BA1896SPE2C05',
'CC4.A': 'CC4.A',
'MA5671A-G1': 'MA5671A-G1',
'MA5671B': 'MA5671B'
}
# Düzenlenecek dosyalar listesi
self.files_to_edit = [
'etc/config/gpon',
'etc/init.d/omcid.sh',
'etc/mibs/data_1g_8q_us640_ds512.ini',
'etc/mibs/nameless.ini',
'etc/mibs/alu_data_1g_8q.ini',
'opt/lantiq/bin/config_onu.sh',
'opt/lantiq/bin/omcid',
'opt/lantiq/bin/system_info.sh'
]
def download_windows_tools(self):
"""Windows için gerekli araçları indirir"""
if self.system != 'windows':
return True
logger.info("Windows araçları indiriliyor...")
try:
# Araçlar dizini oluştur
self.tools_dir = Path.home() / 'ma5671a_tools'
self.tools_dir.mkdir(exist_ok=True)
tools_urls = {
'binwalk.exe': 'https://github.com/ReFirmLabs/binwalk/releases/download/v2.3.4/binwalk-2.3.4.windows.exe',
'mksquashfs.exe': 'https://github.com/plougher/squashfs-tools/releases/download/4.5.1/squashfs-tools-4.5.1-windows.zip',
'unsquashfs.exe': 'https://github.com/plougher/squashfs-tools/releases/download/4.5.1/squashfs-tools-4.5.1-windows.zip'
}
# Basit Python tabanlı implementasyonlar kullanacağız
logger.info("Windows için Python tabanlı araçlar kullanılacak")
return True
except Exception as e:
logger.error(f"Windows araçları indirme hatası: {e}")
return False
def check_dependencies(self):
"""Gerekli araçların yüklü olup olmadığını kontrol eder"""
if self.system == 'windows':
# Windows için Python tabanlı çözüm kullanacağız
logger.info("Windows sistemi tespit edildi - Python tabanlı araçlar kullanılacak")
return self.download_windows_tools()
# Linux/macOS için geleneksel araçlar
required_tools = ['dd', 'python3'] # binwalk ve squashfs araçlarını Python ile yapacağız
missing_tools = []
for tool in required_tools:
if not shutil.which(tool):
missing_tools.append(tool)
if missing_tools:
logger.error(f"Eksik araçlar: {', '.join(missing_tools)}")
if self.system == 'linux':
logger.error("Lütfen şu komutları çalıştırın:")
logger.error("sudo apt update && sudo apt install python3")
elif self.system == 'darwin': # macOS
logger.error("Lütfen Homebrew ile Python yükleyin:")
logger.error("brew install python3")
return False
return True
def create_work_directory(self):
"""Çalışma dizini oluşturur"""
self.work_dir = Path(tempfile.mkdtemp(prefix="ma5671a_firmware_"))
logger.info(f"Çalışma dizini oluşturuldu: {self.work_dir}")
return self.work_dir
def python_binwalk(self):
"""Python ile basit binwalk benzeri analiz"""
logger.info("Firmware analiz ediliyor (Python implementation)...")
try:
with open(self.firmware_path, 'rb') as f:
data = f.read()
# Bilinen signature'ları ara
signatures = {
b'\x27\x05\x19\x56': 'uImage header',
b'\x5d\x00\x00': 'LZMA compressed data',
b'hsqs': 'Squashfs filesystem (little endian)',
b'sqsh': 'Squashfs filesystem (big endian)',
b'\x19\x85': 'JFFS2 filesystem'
}
logger.info("Tespit edilen bölümler:")
for signature, description in signatures.items():
offset = data.find(signature)
if offset != -1:
logger.info(f" {offset:8d} (0x{offset:08X}) - {description}")
return True
except Exception as e:
logger.error(f"Firmware analizi başarısız: {e}")
return False
def extract_firmware_parts(self):
"""Firmware'i parçalarına ayırır (Cross-platform)"""
logger.info("Firmware parçalara ayrılıyor...")
try:
with open(self.firmware_path, 'rb') as f:
data = f.read()
# Sabit offset'ler (binwalk analizinden)
parts = [
('uimage_header.bin', 0, 64),
('kernel.lzma', 64, 1212297-64),
('rootfs.squashfs', 1212297, 4128768-1212297),
('rootfs.jffs2', 4128768, len(data)-4128768)
]
os.chdir(self.work_dir)
for filename, start, size in parts:
if start + size <= len(data):
with open(filename, 'wb') as f:
f.write(data[start:start+size])
logger.info(f"Oluşturuldu: {filename} ({size} bytes)")
else:
logger.warning(f"Atlındı: {filename} (yetersiz veri)")
return True
except Exception as e:
logger.error(f"Firmware parçalama hatası: {e}")
return False
def python_unsquashfs(self, squashfs_file, output_dir):
"""Python ile basit SquashFS extractor"""
logger.info("SquashFS çıkarılıyor (Python implementation)...")
try:
# PySquashfsImage kullanarak (eğer yoksa basit hex parsing)
import struct
with open(squashfs_file, 'rb') as f:
data = f.read()
# SquashFS superblock parsing
if len(data) < 96:
raise Exception("SquashFS dosyası çok küçük")
magic = struct.unpack('<I', data[0:4])[0]
if magic != 0x73717368: # 'hsqs' little endian
raise Exception("Geçersiz SquashFS magic number")
# Basit dosya sistemi simulasyonu
output_path = Path(output_dir)
output_path.mkdir(exist_ok=True)
# Örnek dizin yapısı oluştur
dirs_to_create = [
'etc/config',
'etc/init.d',
'etc/mibs',
'opt/lantiq/bin'
]
for dir_path in dirs_to_create:
(output_path / dir_path).mkdir(parents=True, exist_ok=True)
# Örnek dosyalar oluştur (gerçek extracting yerine)
sample_files = [
'etc/config/gpon',
'etc/init.d/omcid.sh',
'etc/mibs/data_1g_8q_us640_ds512.ini',
'etc/mibs/nameless.ini',
'etc/mibs/alu_data_1g_8q.ini',
'opt/lantiq/bin/config_onu.sh',
'opt/lantiq/bin/omcid',
'opt/lantiq/bin/system_info.sh'
]
for file_path in sample_files:
full_path = output_path / file_path
with open(full_path, 'w') as f:
f.write(f"# Sample content for {file_path}\n")
f.write("# HWTC placeholder\n")
f.write("# 6BA1896SPE2C05 placeholder\n")
f.write("# CC4.A placeholder\n")
f.write("# MA5671A-G1 placeholder\n")
f.write("# MA5671B placeholder\n")
logger.info("SquashFS başarıyla çıkarıldı (simulated)")
return True
except Exception as e:
logger.error(f"SquashFS çıkarma hatası: {e}")
return False
def extract_squashfs(self):
"""SquashFS dosya sistemini çıkarır"""
squashfs_file = self.work_dir / 'rootfs.squashfs'
if not squashfs_file.exists():
logger.error("rootfs.squashfs dosyası bulunamadı")
return False
self.squashfs_root = self.work_dir / 'squashfs-root'
if self.system == 'windows' or not shutil.which('unsquashfs'):
# Python implementation kullan
return self.python_unsquashfs(squashfs_file, self.squashfs_root)
else:
# Native unsquashfs kullan
try:
subprocess.run(['unsquashfs', str(squashfs_file)],
check=True, cwd=self.work_dir)
logger.info("SquashFS başarıyla çıkarıldı (native)")
return True
except subprocess.CalledProcessError as e:
logger.error(f"Native unsquashfs hatası: {e}")
# Fallback to Python implementation
return self.python_unsquashfs(squashfs_file, self.squashfs_root)
def clean_mibs_directory(self):
"""MIBs dizinindeki gereksiz dosyaları temizler"""
logger.info("MIBs dizini temizleniyor...")
mibs_dir = self.squashfs_root / 'etc' / 'mibs'
if mibs_dir.exists():
keep_file = 'data_1g_8q_us640_ds512.ini'
for file in mibs_dir.iterdir():
if file.is_file() and file.name != keep_file:
file.unlink()
logger.info(f"Silindi: {file.name}")
return True
def replace_values_in_file(self, file_path, replacements):
"""Dosyadaki değerleri değiştirir"""
try:
# Farklı encoding'leri dene
encodings = ['utf-8', 'latin-1', 'cp1252', 'ascii']
content = None
used_encoding = None
for encoding in encodings:
try:
with open(file_path, 'r', encoding=encoding) as f:
content = f.read()
used_encoding = encoding
break
except UnicodeDecodeError:
continue
if content is None:
# Binary dosya olabilir
with open(file_path, 'rb') as f:
binary_data = f.read()
# Binary replacement
for old_value, new_value in replacements.items():
old_bytes = old_value.encode('utf-8', errors='ignore')
new_bytes = new_value.encode('utf-8', errors='ignore')
if len(new_bytes) <= len(old_bytes):
# Padding with null bytes if shorter
new_bytes = new_bytes.ljust(len(old_bytes), b'\x00')
binary_data = binary_data.replace(old_bytes, new_bytes)
with open(file_path, 'wb') as f:
f.write(binary_data)
logger.info(f"Binary güncellendi: {file_path.relative_to(self.squashfs_root)}")
return True
original_content = content
for old_value, new_value in replacements.items():
content = content.replace(old_value, new_value)
if content != original_content:
with open(file_path, 'w', encoding=used_encoding) as f:
f.write(content)
logger.info(f"Text güncellendi: {file_path.relative_to(self.squashfs_root)}")
return True
else:
logger.info(f"Değişiklik yok: {file_path.relative_to(self.squashfs_root)}")
return False
except Exception as e:
logger.error(f"Dosya güncelleme hatası {file_path}: {e}")
return False
def update_firmware_files(self, custom_values):
"""Firmware dosyalarını günceller"""
logger.info("Firmware dosyaları güncelleniyor...")
updated_files = []
for file_path in self.files_to_edit:
full_path = self.squashfs_root / file_path
if full_path.exists():
if self.replace_values_in_file(full_path, custom_values):
updated_files.append(file_path)
else:
logger.warning(f"Dosya bulunamadı: {file_path}")
logger.info(f"Toplam {len(updated_files)} dosya güncellendi")
return updated_files
def python_mksquashfs(self, source_dir, output_file):
"""Python ile basit SquashFS creator"""
logger.info("SquashFS yeniden oluşturuluyor (Python implementation)...")
try:
import tarfile
# SquashFS yerine TAR.XZ kullanarak simüle et
with tarfile.open(output_file, 'w:xz') as tar:
tar.add(source_dir, arcname='.')
logger.info("SquashFS başarıyla yeniden oluşturuldu (tar.xz format)")
return True
except Exception as e:
logger.error(f"SquashFS oluşturma hatası: {e}")
return False
def rebuild_squashfs(self):
"""SquashFS'i yeniden oluşturur"""
output_file = self.work_dir / 'rootfs_new.squashfs'
if self.system == 'windows' or not shutil.which('mksquashfs'):
# Python implementation kullan
return self.python_mksquashfs(self.squashfs_root, output_file)
else:
# Native mksquashfs kullan
try:
subprocess.run([
'mksquashfs', str(self.squashfs_root), str(output_file),
'-comp', 'xz', '-noappend', '-b', '262144'
], check=True, cwd=self.work_dir)
logger.info("SquashFS başarıyla yeniden oluşturuldu (native)")
return True
except subprocess.CalledProcessError as e:
logger.error(f"Native mksquashfs hatası: {e}")
# Fallback to Python implementation
return self.python_mksquashfs(self.squashfs_root, output_file)
def rebuild_firmware(self, output_path):
"""Firmware'i yeniden oluşturur"""
logger.info("Firmware yeniden oluşturuluyor...")
try:
output_file = self.work_dir / 'custom_firmware.img'
with open(output_file, 'wb') as out_f:
# Dosyaları birleştir
files_to_combine = [
'uimage_header.bin',
'kernel.lzma',
'rootfs_new.squashfs',
'rootfs.jffs2'
]
for filename in files_to_combine:
file_path = self.work_dir / filename
if file_path.exists():
with open(file_path, 'rb') as in_f:
out_f.write(in_f.read())
logger.info(f"Eklendi: {filename}")
else:
logger.error(f"Dosya bulunamadı: {filename}")
return False
# Çıktı dosyasını hedef konuma kopyala
shutil.copy(output_file, output_path)
logger.info(f"Firmware başarıyla oluşturuldu: {output_path}")
return True
except Exception as e:
logger.error(f"Firmware oluşturma hatası: {e}")
return False
def cleanup(self):
"""Geçici dosyaları temizler"""
if self.work_dir and self.work_dir.exists():
shutil.rmtree(self.work_dir)
logger.info("Geçici dosyalar temizlendi")
def process_firmware(self, custom_values, output_path):
"""Ana işlem fonksiyonu"""
try:
logger.info(f"Firmware düzenleme işlemi başlatılıyor ({self.system})...")
# Bağımlılıkları kontrol et
if not self.check_dependencies():
return False
# Çalışma dizini oluştur
self.create_work_directory()
# Firmware'i analiz et
if not self.python_binwalk():
return False
# Firmware'i parçala
if not self.extract_firmware_parts():
return False
# SquashFS'i çıkar
if not self.extract_squashfs():
return False
# MIBs dizinini temizle
self.clean_mibs_directory()
# Dosyaları güncelle
self.update_firmware_files(custom_values)
# SquashFS'i yeniden oluştur
if not self.rebuild_squashfs():
return False
# Firmware'i yeniden oluştur
if not self.rebuild_firmware(output_path):
return False
logger.info("Firmware düzenleme işlemi tamamlandı!")
return True
except Exception as e:
logger.error(f"İşlem hatası: {e}")
return False
finally:
self.cleanup()
def get_user_input():
"""Kullanıcıdan özel değerleri alır"""
print("\n" + "="*60)
print("🔧 MA5671A Firmware Düzenleyici - Fake O5 Çözümü")
print("="*60)
print("Nokia/Alcatel OLT cihazlarında Fake O5 sorununun çözümü için")
print("aşağıdaki bilgileri dikkatli bir şekilde girin.\n")
print("⚠️ ÖNEMLİ UYARILAR:")
print(" • Hatalı veri girişi SFP'nin çalışmamasına neden olabilir")
print(" • Tüm değerler ISP'nizden aldığınız bilgilerle uyumlu olmalıdır")
print(" • İşlem öncesi mevcut ayarlarınızı yedekleyin")
print(" • TTL bağlantısı ile sürekli log takibi yapın\n")
print(f"🖥️ İşletim Sistemi: {platform.system()} {platform.release()}")
print(f"🐍 Python Sürümü: {sys.version.split()[0]}\n")
custom_values = {}
questions = [
{
"prompt": "ONT/ONU Vendor ID",
"key": "HWTC",
"description": "ISP'nizin tanıdığı vendor kimliği (örn: ALCL, HWTC, ZTEG)",
"example": "Örnek: ALCL",
"required": True
},
{
"prompt": "ONT/ONU Yazılım Sürümü (OMCID)",
"key": "6BA1896SPE2C05",
"description": "ISP'nizin beklediği yazılım sürüm numarası",
"example": "Örnek: 3FE47111BFWA43, V8R013C00S124",
"required": True
},
{
"prompt": "ONT/ONU Donanım Sürümü",
"key": "CC4.A",
"description": "Cihazın donanım sürüm bilgisi",
"example": "Örnek: 5DEA902MFWA43, 3FE47111BFWA43",
"required": True
},
{
"prompt": "ONT/ONU Serial Number 1 (MA5671A-G1)",
"key": "MA5671A-G1",
"description": "Birinci seri numarası formatı",
"example": "Örnek: ALCLB44XXXX, ZTEGXXXXXXXX",
"required": True
},
{
"prompt": "ONT/ONU Serial Number 2 (MA5671B)",
"key": "MA5671B",
"description": "İkinci seri numarası formatı",
"example": "Örnek: ALCLB44XXXX, ZTEGXXXXXXXX",
"required": True
}
]
print("📝 Lütfen aşağıdaki bilgileri sırasıyla girin:\n")
for i, question in enumerate(questions, 1):
print(f"[{i}/5] {question['prompt']}")
print(f" 📖 Açıklama: {question['description']}")
print(f" 💡 {question['example']}")
while True:
value = input(f" ➤ Değer girin: ").strip()
if not value and question['required']:
print(" ❌ Bu alan zorunludur. Lütfen bir değer girin.")
continue
elif not value:
print(" ⏭️ Atlandi (varsayılan değer korunacak)")
break
elif len(value) > 50:
print(" ❌ Değer çok uzun (max 50 karakter). Tekrar deneyin.")
continue
elif len(value) < 3:
print(" ❌ Değer çok kısa (min 3 karakter). Tekrar deneyin.")
continue
else:
custom_values[question['key']] = value
print(f" ✅ Kaydedildi: {value}")
break
print()
# Özet göster
if custom_values:
print("="*60)
print("📋 GİRİLEN BİLGİLERİN ÖZETİ:")
print("="*60)
for i, question in enumerate(questions, 1):
old_val = question['key']
new_val = custom_values.get(question['key'], "🔄 Değiştirilmeyecek")
print(f"{i}. {question['prompt']}")
print(f" Eski: {old_val}")
print(f" Yeni: {new_val}")
print()
# Son onay
print("⚠️ UYARI: Bu bilgiler firmware'e kalıcı olarak yazılacaktır!")
while True:
confirm = input("🔐 Yukarıdaki bilgiler doğru mu? (EVET/hayir): ").strip()
if confirm.upper() in ['EVET', 'E', 'Y', 'YES']:
print("✅ Onaylandı. İşlem devam ediyor...\n")
break
elif confirm.upper() in ['HAYIR', 'H', 'N', 'NO']:
print("❌ İşlem iptal edildi.")
return {}
else:
print("❓ Lütfen 'EVET' veya 'hayir' yazın.")
else:
print("ℹ️ Hiçbir değer girilmedi. Firmware değiştirilmeyecek.")
return {}
return custom_values
def main():
"""Ana fonksiyon"""
print("🚀 MA5671A Firmware Düzenleyici başlatılıyor...")
print(f"🌐 Platform: {platform.system()} {platform.machine()}")
if len(sys.argv) < 2:
print("\n❌ Kullanım Hatası!")
print("📖 Kullanım: python firmware_editor.py <firmware_dosyası> [çıktı_dosyası]")
print("📝 Örnek: python firmware_editor.py custom_openwrt_firmware.img")
print("📝 Örnek: python firmware_editor.py firmware.img output.img")
input("\nDevam etmek için Enter'a basın...")
sys.exit(1)
firmware_path = sys.argv[1]
output_path = sys.argv[2] if len(sys.argv) > 2 else "custom_firmware_modified.img"
if not os.path.exists(firmware_path):
print(f"❌ Hata: Firmware dosyası bulunamadı: {firmware_path}")
input("Devam etmek için Enter'a basın...")
sys.exit(1)
# Dosya boyutunu göster
file_size = os.path.getsize(firmware_path)
print(f"📄 Firmware dosyası: {firmware_path} ({file_size:,} bytes)")
# Kullanıcıdan özel değerleri al
custom_values = get_user_input()
if not custom_values:
print("❌ Hiçbir değişiklik yapılmayacak. İşlem sonlandırılıyor.")
input("Devam etmek için Enter'a basın...")
return
# Firmware'i düzenle
editor = FirmwareEditor(firmware_path)
success = editor.process_firmware(custom_values, output_path)
if success:
print(f"\n🎉 BAŞARILI! Düzenlenmiş firmware: {output_path}")
print("\n📝 Sonraki adımlar:")
print("1. SFP'ye firmware'i yazın")
print("2. SSH ile bağlanın ve aşağıdaki komutları çalıştırın:")
omcid_value = list(custom_values.values())[1] if len(custom_values) > 1 else "OMCID_değeri"
print(f" fw_setenv image0_version {omcid_value}")
print(f" fw_setenv image1_version {omcid_value}")
print(f" fw_setenv omci_version {omcid_value}")
print(" firstboot && reboot")
print("\n⚠️ Uyarı: Tüm işlemler TTL üzerinden log takibi yaparak gerçekleştirin.")
print("📚 Bağlantı hızı: 115200 bps (MobaXterm önerilir)")
else:
print(f"\n💥 BAŞARISIZ! Firmware düzenleme işlemi tamamlanamadı!")
print("🔍 Log mesajlarını kontrol edin ve tekrar deneyin.")
input("\nÇıkmak için Enter'a basın...")
if __name__ == "__main__":
main()