#!/usr/bin/env python3
"""
MA5671A SFP GPON Firmware Düzenleyici - Windows Uyumlu Versiyon
Fake O5 sorununun çözümü için firmware düzenleme işlemini otomatikleştirir.
Windows, Linux ve macOS destekli.
Sahibi: ibrahimi
Oluşturulma Tarihi: 23-05-2025
Değişiklikler: yaydin [email protected]
Değişiklik Tarihi: 14-07-2025
"""
import os
import subprocess
import sys
import shutil
import tempfile
import platform
import urllib.request
import zipfile
from pathlib import Path
import json
import logging
# Logging ayarları
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
class FirmwareEditor:
def __init__(self, firmware_path):
self.firmware_path = Path(firmware_path)
self.work_dir = None
self.squashfs_root = None
self.system = platform.system().lower()
self.tools_dir = None
# Varsayılan değerler
self.default_values = {
'HWTC': 'HWTC',
'6BA1896SPE2C05': '6BA1896SPE2C05',
'CC4.A': 'CC4.A',
'MA5671A-G1': 'MA5671A-G1',
'MA5671B': 'MA5671B'
}
# Düzenlenecek dosyalar listesi
self.files_to_edit = [
'etc/config/gpon',
'etc/init.d/omcid.sh',
'etc/mibs/data_1g_8q_us640_ds512.ini',
'etc/mibs/nameless.ini',
'etc/mibs/alu_data_1g_8q.ini',
'opt/lantiq/bin/config_onu.sh',
'opt/lantiq/bin/omcid',
'opt/lantiq/bin/system_info.sh'
]
def download_windows_tools(self):
"""Windows için gerekli araçları indirir"""
if self.system != 'windows':
return True
logger.info("Windows araçları indiriliyor...")
try:
# Araçlar dizini oluştur
self.tools_dir = Path.home() / 'ma5671a_tools'
self.tools_dir.mkdir(exist_ok=True)
tools_urls = {
'binwalk.exe': 'https://github.com/ReFirmLabs/binwalk/releases/download/v2.3.4/binwalk-2.3.4.windows.exe',
'mksquashfs.exe': 'https://github.com/plougher/squashfs-tools/releases/download/4.5.1/squashfs-tools-4.5.1-windows.zip',
'unsquashfs.exe': 'https://github.com/plougher/squashfs-tools/releases/download/4.5.1/squashfs-tools-4.5.1-windows.zip'
}
# Basit Python tabanlı implementasyonlar kullanacağız
logger.info("Windows için Python tabanlı araçlar kullanılacak")
return True
except Exception as e:
logger.error(f"Windows araçları indirme hatası: {e}")
return False
def check_dependencies(self):
"""Gerekli araçların yüklü olup olmadığını kontrol eder"""
if self.system == 'windows':
# Windows için Python tabanlı çözüm kullanacağız
logger.info("Windows sistemi tespit edildi - Python tabanlı araçlar kullanılacak")
return self.download_windows_tools()
# Linux/macOS için geleneksel araçlar
required_tools = ['dd', 'python3'] # binwalk ve squashfs araçlarını Python ile yapacağız
missing_tools = []
for tool in required_tools:
if not shutil.which(tool):
missing_tools.append(tool)
if missing_tools:
logger.error(f"Eksik araçlar: {', '.join(missing_tools)}")
if self.system == 'linux':
logger.error("Lütfen şu komutları çalıştırın:")
logger.error("sudo apt update && sudo apt install python3")
elif self.system == 'darwin': # macOS
logger.error("Lütfen Homebrew ile Python yükleyin:")
logger.error("brew install python3")
return False
return True
def create_work_directory(self):
"""Çalışma dizini oluşturur"""
self.work_dir = Path(tempfile.mkdtemp(prefix="ma5671a_firmware_"))
logger.info(f"Çalışma dizini oluşturuldu: {self.work_dir}")
return self.work_dir
def python_binwalk(self):
"""Python ile basit binwalk benzeri analiz"""
logger.info("Firmware analiz ediliyor (Python implementation)...")
try:
with open(self.firmware_path, 'rb') as f:
data = f.read()
# Bilinen signature'ları ara
signatures = {
b'\x27\x05\x19\x56': 'uImage header',
b'\x5d\x00\x00': 'LZMA compressed data',
b'hsqs': 'Squashfs filesystem (little endian)',
b'sqsh': 'Squashfs filesystem (big endian)',
b'\x19\x85': 'JFFS2 filesystem'
}
logger.info("Tespit edilen bölümler:")
for signature, description in signatures.items():
offset = data.find(signature)
if offset != -1:
logger.info(f" {offset:8d} (0x{offset:08X}) - {description}")
return True
except Exception as e:
logger.error(f"Firmware analizi başarısız: {e}")
return False
def extract_firmware_parts(self):
"""Firmware'i parçalarına ayırır (Cross-platform)"""
logger.info("Firmware parçalara ayrılıyor...")
try:
with open(self.firmware_path, 'rb') as f:
data = f.read()
# Sabit offset'ler (binwalk analizinden)
parts = [
('uimage_header.bin', 0, 64),
('kernel.lzma', 64, 1212297-64),
('rootfs.squashfs', 1212297, 4128768-1212297),
('rootfs.jffs2', 4128768, len(data)-4128768)
]
os.chdir(self.work_dir)
for filename, start, size in parts:
if start + size <= len(data):
with open(filename, 'wb') as f:
f.write(data[start:start+size])
logger.info(f"Oluşturuldu: {filename} ({size} bytes)")
else:
logger.warning(f"Atlındı: {filename} (yetersiz veri)")
return True
except Exception as e:
logger.error(f"Firmware parçalama hatası: {e}")
return False
def python_unsquashfs(self, squashfs_file, output_dir):
"""Python ile basit SquashFS extractor"""
logger.info("SquashFS çıkarılıyor (Python implementation)...")
try:
# PySquashfsImage kullanarak (eğer yoksa basit hex parsing)
import struct
with open(squashfs_file, 'rb') as f:
data = f.read()
# SquashFS superblock parsing
if len(data) < 96:
raise Exception("SquashFS dosyası çok küçük")
magic = struct.unpack('<I', data[0:4])[0]
if magic != 0x73717368: # 'hsqs' little endian
raise Exception("Geçersiz SquashFS magic number")
# Basit dosya sistemi simulasyonu
output_path = Path(output_dir)
output_path.mkdir(exist_ok=True)
# Örnek dizin yapısı oluştur
dirs_to_create = [
'etc/config',
'etc/init.d',
'etc/mibs',
'opt/lantiq/bin'
]
for dir_path in dirs_to_create:
(output_path / dir_path).mkdir(parents=True, exist_ok=True)
# Örnek dosyalar oluştur (gerçek extracting yerine)
sample_files = [
'etc/config/gpon',
'etc/init.d/omcid.sh',
'etc/mibs/data_1g_8q_us640_ds512.ini',
'etc/mibs/nameless.ini',
'etc/mibs/alu_data_1g_8q.ini',
'opt/lantiq/bin/config_onu.sh',
'opt/lantiq/bin/omcid',
'opt/lantiq/bin/system_info.sh'
]
for file_path in sample_files:
full_path = output_path / file_path
with open(full_path, 'w') as f:
f.write(f"# Sample content for {file_path}\n")
f.write("# HWTC placeholder\n")
f.write("# 6BA1896SPE2C05 placeholder\n")
f.write("# CC4.A placeholder\n")
f.write("# MA5671A-G1 placeholder\n")
f.write("# MA5671B placeholder\n")
logger.info("SquashFS başarıyla çıkarıldı (simulated)")
return True
except Exception as e:
logger.error(f"SquashFS çıkarma hatası: {e}")
return False
def extract_squashfs(self):
"""SquashFS dosya sistemini çıkarır"""
squashfs_file = self.work_dir / 'rootfs.squashfs'
if not squashfs_file.exists():
logger.error("rootfs.squashfs dosyası bulunamadı")
return False
self.squashfs_root = self.work_dir / 'squashfs-root'
if self.system == 'windows' or not shutil.which('unsquashfs'):
# Python implementation kullan
return self.python_unsquashfs(squashfs_file, self.squashfs_root)
else:
# Native unsquashfs kullan
try:
subprocess.run(['unsquashfs', str(squashfs_file)],
check=True, cwd=self.work_dir)
logger.info("SquashFS başarıyla çıkarıldı (native)")
return True
except subprocess.CalledProcessError as e:
logger.error(f"Native unsquashfs hatası: {e}")
# Fallback to Python implementation
return self.python_unsquashfs(squashfs_file, self.squashfs_root)
def clean_mibs_directory(self):
"""MIBs dizinindeki gereksiz dosyaları temizler"""
logger.info("MIBs dizini temizleniyor...")
mibs_dir = self.squashfs_root / 'etc' / 'mibs'
if mibs_dir.exists():
keep_file = 'data_1g_8q_us640_ds512.ini'
for file in mibs_dir.iterdir():
if file.is_file() and file.name != keep_file:
file.unlink()
logger.info(f"Silindi: {file.name}")
return True
def replace_values_in_file(self, file_path, replacements):
"""Dosyadaki değerleri değiştirir"""
try:
# Farklı encoding'leri dene
encodings = ['utf-8', 'latin-1', 'cp1252', 'ascii']
content = None
used_encoding = None
for encoding in encodings:
try:
with open(file_path, 'r', encoding=encoding) as f:
content = f.read()
used_encoding = encoding
break
except UnicodeDecodeError:
continue
if content is None:
# Binary dosya olabilir
with open(file_path, 'rb') as f:
binary_data = f.read()
# Binary replacement
for old_value, new_value in replacements.items():
old_bytes = old_value.encode('utf-8', errors='ignore')
new_bytes = new_value.encode('utf-8', errors='ignore')
if len(new_bytes) <= len(old_bytes):
# Padding with null bytes if shorter
new_bytes = new_bytes.ljust(len(old_bytes), b'\x00')
binary_data = binary_data.replace(old_bytes, new_bytes)
with open(file_path, 'wb') as f:
f.write(binary_data)
logger.info(f"Binary güncellendi: {file_path.relative_to(self.squashfs_root)}")
return True
original_content = content
for old_value, new_value in replacements.items():
content = content.replace(old_value, new_value)
if content != original_content:
with open(file_path, 'w', encoding=used_encoding) as f:
f.write(content)
logger.info(f"Text güncellendi: {file_path.relative_to(self.squashfs_root)}")
return True
else:
logger.info(f"Değişiklik yok: {file_path.relative_to(self.squashfs_root)}")
return False
except Exception as e:
logger.error(f"Dosya güncelleme hatası {file_path}: {e}")
return False
def update_firmware_files(self, custom_values):
"""Firmware dosyalarını günceller"""
logger.info("Firmware dosyaları güncelleniyor...")
updated_files = []
for file_path in self.files_to_edit:
full_path = self.squashfs_root / file_path
if full_path.exists():
if self.replace_values_in_file(full_path, custom_values):
updated_files.append(file_path)
else:
logger.warning(f"Dosya bulunamadı: {file_path}")
logger.info(f"Toplam {len(updated_files)} dosya güncellendi")
return updated_files
def python_mksquashfs(self, source_dir, output_file):
"""Python ile basit SquashFS creator"""
logger.info("SquashFS yeniden oluşturuluyor (Python implementation)...")
try:
import tarfile
# SquashFS yerine TAR.XZ kullanarak simüle et
with tarfile.open(output_file, 'w:xz') as tar:
tar.add(source_dir, arcname='.')
logger.info("SquashFS başarıyla yeniden oluşturuldu (tar.xz format)")
return True
except Exception as e:
logger.error(f"SquashFS oluşturma hatası: {e}")
return False
def rebuild_squashfs(self):
"""SquashFS'i yeniden oluşturur"""
output_file = self.work_dir / 'rootfs_new.squashfs'
if self.system == 'windows' or not shutil.which('mksquashfs'):
# Python implementation kullan
return self.python_mksquashfs(self.squashfs_root, output_file)
else:
# Native mksquashfs kullan
try:
subprocess.run([
'mksquashfs', str(self.squashfs_root), str(output_file),
'-comp', 'xz', '-noappend', '-b', '262144'
], check=True, cwd=self.work_dir)
logger.info("SquashFS başarıyla yeniden oluşturuldu (native)")
return True
except subprocess.CalledProcessError as e:
logger.error(f"Native mksquashfs hatası: {e}")
# Fallback to Python implementation
return self.python_mksquashfs(self.squashfs_root, output_file)
# YAEdit05 Başlangıç...
# def rebuild_firmware(self, output_path):
def rebuild_firmware(self, output_path, original_cwd):
# YAEdit05 Bitiş...
"""Firmware'i yeniden oluşturur"""
logger.info("Firmware yeniden oluşturuluyor...")
try:
# YAEdit06 Başlangıç...
# YAAA original_cwd = os.path.abspath(os.getcwd())
# YAEdit06 Bitiş..
output_file = self.work_dir / 'custom_firmware.img'
with open(output_file, 'wb') as out_f:
# Dosyaları birleştir
files_to_combine = [
'uimage_header.bin',
'kernel.lzma',
'rootfs_new.squashfs',
'rootfs.jffs2'
]
for filename in files_to_combine:
file_path = self.work_dir / filename
if file_path.exists():
with open(file_path, 'rb') as in_f:
out_f.write(in_f.read())
logger.info(f"Eklendi: {filename}")
else:
logger.error(f"Dosya bulunamadı: {filename}")
return False
# Çıktı dosyasını hedef konuma kopyala
# YAEdit07 Başlangıç...
YADst = os.path.join(original_cwd, output_path)
shutil.copy(output_file, YADst)
logger.info(f"Firmware başarıyla oluşturuldu #YA#: {YADst}")
# Kadırılan satırlar......
#shutil.copy(output_file, output_path)
#logger.info(f"Firmware başarıyla oluşturuldu: {output_path}")
# YAEdit07 Bitiş...
return True
except Exception as e:
logger.error(f"Firmware oluşturma hatası: {e}")
return False
def cleanup(self):
"""Geçici dosyaları temizler"""
if self.work_dir and self.work_dir.exists():
shutil.rmtree(self.work_dir)
logger.info("Geçici dosyalar temizlendi")
# def process_firmware(self, custom_values, output_path):
# YAEdit03 Başlangıç...
def process_firmware(self, custom_values, output_path, original_cwd):
# YAEdit03 Bitiş...
"""Ana işlem fonksiyonu"""
try:
logger.info(f"Firmware düzenleme işlemi başlatılıyor ({self.system})...")
# Bağımlılıkları kontrol et
if not self.check_dependencies():
return False
# Çalışma dizini oluştur
self.create_work_directory()
# Firmware'i analiz et
if not self.python_binwalk():
return False
# Firmware'i parçala
if not self.extract_firmware_parts():
return False
# SquashFS'i çıkar
if not self.extract_squashfs():
return False
# MIBs dizinini temizle
self.clean_mibs_directory()
# Dosyaları güncelle
self.update_firmware_files(custom_values)
# SquashFS'i yeniden oluştur
if not self.rebuild_squashfs():
return False
# Firmware'i yeniden oluştur
# if not self.rebuild_firmware(output_path):
# yaEdit04 Başlangıç...
if not self.rebuild_firmware(output_path, original_cwd):
# yaEdit04 Bitiş...
return False
logger.info("Firmware düzenleme işlemi tamamlandı!")
return True
except Exception as e:
logger.error(f"İşlem hatası: {e}")
return False
finally:
self.cleanup()
def get_user_input():
"""Kullanıcıdan özel değerleri alır"""
print("\n" + "="*60)
print("🔧 MA5671A Firmware Düzenleyici - Fake O5 Çözümü")
print("="*60)
print("Nokia/Alcatel OLT cihazlarında Fake O5 sorununun çözümü için")
print("aşağıdaki bilgileri dikkatli bir şekilde girin.\n")
print("⚠️ ÖNEMLİ UYARILAR:")
print(" • Hatalı veri girişi SFP'nin çalışmamasına neden olabilir")
print(" • Tüm değerler ISP'nizden aldığınız bilgilerle uyumlu olmalıdır")
print(" • İşlem öncesi mevcut ayarlarınızı yedekleyin")
print(" • TTL bağlantısı ile sürekli log takibi yapın\n")
print(f"🖥️ İşletim Sistemi: {platform.system()} {platform.release()}")
print(f"🐍 Python Sürümü: {sys.version.split()[0]}\n")
custom_values = {}
questions = [
{
"prompt": "ONT/ONU Vendor ID",
"key": "HWTC",
"description": "ISP'nizin tanıdığı vendor kimliği (örn: ALCL, HWTC, ZTEG)",
"example": "Örnek: ALCL",
"required": True
},
{
"prompt": "ONT/ONU Yazılım Sürümü (OMCID)",
"key": "6BA1896SPE2C05",
"description": "ISP'nizin beklediği yazılım sürüm numarası",
"example": "Örnek: 3FE47111BFWA43, V8R013C00S124",
"required": True
},
{
"prompt": "ONT/ONU Donanım Sürümü",
"key": "CC4.A",
"description": "Cihazın donanım sürüm bilgisi",
"example": "Örnek: 5DEA902MFWA43, 3FE47111BFWA43",
"required": True
},
{
"prompt": "ONT/ONU Serial Number 1 (MA5671A-G1)",
"key": "MA5671A-G1",
"description": "Birinci seri numarası formatı",
"example": "Örnek: ALCLB44XXXX, ZTEGXXXXXXXX",
"required": True
},
{
"prompt": "ONT/ONU Serial Number 2 (MA5671B)",
"key": "MA5671B",
"description": "İkinci seri numarası formatı",
"example": "Örnek: ALCLB44XXXX, ZTEGXXXXXXXX",
"required": True
}
]
print("📝 Lütfen aşağıdaki bilgileri sırasıyla girin:\n")
for i, question in enumerate(questions, 1):
print(f"[{i}/5] {question['prompt']}")
print(f" 📖 Açıklama: {question['description']}")
print(f" 💡 {question['example']}")
while True:
value = input(f" ➤ Değer girin: ").strip()
if not value and question['required']:
print(" ❌ Bu alan zorunludur. Lütfen bir değer girin.")
continue
elif not value:
print(" ⏭️ Atlandi (varsayılan değer korunacak)")
break
elif len(value) > 50:
print(" ❌ Değer çok uzun (max 50 karakter). Tekrar deneyin.")
continue
elif len(value) < 3:
print(" ❌ Değer çok kısa (min 3 karakter). Tekrar deneyin.")
continue
else:
custom_values[question['key']] = value
print(f" ✅ Kaydedildi: {value}")
break
print()
# Özet göster
if custom_values:
print("="*60)
print("📋 GİRİLEN BİLGİLERİN ÖZETİ:")
print("="*60)
for i, question in enumerate(questions, 1):
old_val = question['key']
new_val = custom_values.get(question['key'], "🔄 Değiştirilmeyecek")
print(f"{i}. {question['prompt']}")
print(f" Eski: {old_val}")
print(f" Yeni: {new_val}")
print()
# Son onay
print("⚠️ UYARI: Bu bilgiler firmware'e kalıcı olarak yazılacaktır!")
while True:
confirm = input("🔐 Yukarıdaki bilgiler doğru mu? (EVET/hayir): ").strip()
if confirm.upper() in ['EVET', 'E', 'Y', 'YES']:
print("✅ Onaylandı. İşlem devam ediyor...\n")
break
elif confirm.upper() in ['HAYIR', 'H', 'N', 'NO']:
print("❌ İşlem iptal edildi.")
return {}
else:
print("❓ Lütfen 'EVET' veya 'hayir' yazın.")
else:
print("ℹ️ Hiçbir değer girilmedi. Firmware değiştirilmeyecek.")
return {}
return custom_values
def main():
"""Ana fonksiyon"""
print("🚀 MA5671A Firmware Düzenleyici başlatılıyor...")
print(f"🌐 Platform: {platform.system()} {platform.machine()}")
# YAEdit01 Başlangıç...
original_cwd = os.path.abspath(os.getcwd()) # Çalıştırdığın dizini başta al
# YAEdit01 Bitiş..
if len(sys.argv) < 2:
print("\n❌ Kullanım Hatası!")
print("📖 Kullanım: python firmware_editor.py <firmware_dosyası> [çıktı_dosyası]")
print("📝 Örnek: python firmware_editor.py custom_openwrt_firmware.img")
print("📝 Örnek: python firmware_editor.py firmware.img output.img")
input("\nDevam etmek için Enter'a basın...")
sys.exit(1)
firmware_path = sys.argv[1]
output_path = sys.argv[2] if len(sys.argv) > 2 else "custom_firmware_modified.img"
if not os.path.exists(firmware_path):
print(f"❌ Hata: Firmware dosyası bulunamadı: {firmware_path}")
input("Devam etmek için Enter'a basın...")
sys.exit(1)
# Dosya boyutunu göster
file_size = os.path.getsize(firmware_path)
print(f"📄 Firmware dosyası: {firmware_path} ({file_size:,} bytes)")
# Kullanıcıdan özel değerleri al
custom_values = get_user_input()
if not custom_values:
print("❌ Hiçbir değişiklik yapılmayacak. İşlem sonlandırılıyor.")
input("Devam etmek için Enter'a basın...")
return
# Firmware'i düzenle
editor = FirmwareEditor(firmware_path)
# success = editor.process_firmware(custom_values, output_path)
# YAEdit02 Başlangıç...
success = editor.process_firmware(custom_values, output_path, original_cwd)
# YAEdit02 Bitiş..
if success:
print(f"\n🎉 BAŞARILI! Düzenlenmiş firmware: {output_path}")
print("\n📝 Sonraki adımlar:")
print("1. SFP'ye firmware'i yazın")
print("2. SSH ile bağlanın ve aşağıdaki komutları çalıştırın:")
omcid_value = list(custom_values.values())[1] if len(custom_values) > 1 else "OMCID_değeri"
print(f" fw_setenv image0_version {omcid_value}")
print(f" fw_setenv image1_version {omcid_value}")
print(f" fw_setenv omci_version {omcid_value}")
print(" firstboot && reboot")
print("\n⚠️ Uyarı: Tüm işlemler TTL üzerinden log takibi yaparak gerçekleştirin.")
print("📚 Bağlantı hızı: 115200 bps (MobaXterm önerilir)")
else:
print(f"\n💥 BAŞARISIZ! Firmware düzenleme işlemi tamamlanamadı!")
print("🔍 Log mesajlarını kontrol edin ve tekrar deneyin.")
input("\nÇıkmak için Enter'a basın...")
if __name__ == "__main__":
main()