AI/광인사플젝

[머신러닝]4.치매예측 인공지능 - 신호처리/데이터 전처리

살랑춤춰요 2024. 8. 1. 14:16

전자공학을 전공했고, 신호처리에 관해 여러 공부를 해왔습니다. 어디에 써먹을까 했는데 여기에 사용하게 되어서 기쁩니다. 하지만 신호처리의 과정만 다루고 정확한 수식적 해석은 여기선 다루지 않겠습니다. (양이 너무많습니다.)

 

# Raw data >> nirs_procssing 처리 작업 순서
  - 01. 웨이블릿 디노이징
  - 02. 5차 버터 로스 패스 필터 적용 or Low Pass Filter 적용
  - 03. 광학밀도 변환 - np.log10( mean(ch) / ch)
  - 04. 상대적 헤모글로빈 농도 변환 -  (OD730*e850_deoxy-OD850*e730_deoxy)/(sep_L*e_c)


# nirs_procssing >> c_nirs_processing
    cNIRS_HbO_constant = np.dot(HbO, ch7_HbO) / np.dot(HbO, ch7_HbO)
    cNIRS_Hb_constant = np.dot(Hb, ch7_Hb) / np.dot(Hb, ch7_Hb)
    if cNIRS_HbO_constant>0:
        cHbO = HbO - ch7_HbO * float(cNIRS_HbO_constant)
    else:
        cHbO = HbO
    if cNIRS_Hb_constant >0:
        cHb = Hb - ch7_Hb * float(cNIRS_Hb_constant)
    else:
        cHb = Hb
  

#1. 필터처리와 웨이블릿 변환 적용

- 웨이블릿변환을 사용한 이유 : 시간에 따른 신호의 변화량을 파악하기 위해서 사용합니다.

- 이전 페이지에서 결론에 설명했듯 높은 신호값이 많을수록 정상에 가깝고 낮은 신호값이 많을수록 알츠하이머에 가깝기 때문에 변화량을 파악해야합니다.

def butter_lowpass_filter(data, cutoff, fs, order):
    normal_cutoff = 2 * cutoff/fs
    # Get the filter coefficients
    b, a = butter(order, normal_cutoff, btype='low', analog=False)
    y = filtfilt(b,a,data)
    return y
 
def lowpassfilter(signal, thresh = 0, wavelet="db4"):
    thresh = thresh*np.nanmax(signal)
    coeff = pywt.wavedec(signal, wavelet, mode="per" )
    coeff[1:] = (pywt.threshold(i, value=thresh, mode="soft" ) for i in coeff[1:])
    reconstructed_signal = pywt.waverec(coeff, wavelet, mode="per" )
    if len(signal) != len(reconstructed_signal) :
        reconstructed_signal = reconstructed_signal[1:]

    return reconstructed_signal[:]

wav_df_98 = pd.DataFrame()

for idx in tqdm(df_98.uid.unique()) :

    uid_1 = df_98[df_98.uid==idx]
    col_lst = uid_1.columns[2:-1].tolist()
    print("columns lst : {}".format(col_lst), end='\r')
    for j in col_lst :
        ###################################################################################################
        uid_1[j] = denoise_wavelet(uid_1[j].values, wavelet='db8',
                                   mode='soft', wavelet_levels=5, method='BayesShrink', rescale_sigma='True')
        ###################################################################################################
        uid_1[j] = butter_lowpass_filter(uid_1[j].values, 0.15, 3, 5)

    wav_df_98 = pd.concat([wav_df_98,uid_1])
wav_df_98 = wav_df_98.reset_index(drop=True)

uids = 14

plt.title("98 Wavelet Denoise Data - User name : {} -  730nm,850nm - 4Channel".format(uids),fontsize=20)
wav_df_98[wav_df_98.uid==uids].A_730_3.plot(figsize=(12,5))
wav_df_98[wav_df_98.uid==uids].A_850_3.plot(figsize=(12,5))

 

LPF 의 한 종류인 butterworth filter(5차)와 웨이블릿 변환을 적용 후 출력된 신호형태입니다.

 

###################################################################################################
        uid_1[j] = denoise_wavelet(uid_1[j].values, wavelet='db8',
                                   mode='soft', wavelet_levels=5, method='BayesShrink', rescale_sigma='True')   ###################################################################################################

 

        uid_1[j] = butter_lowpass_filter(uid_1[j].values, 0.15, 3, 5)


이 부분에서 denoise_wavelet 을 먼저 실시하고 filter 을 적용시켰는데

filter 을 먼저 적용하고 denoise_wavelet 을 실시한것과 차이가 있습니다.

<차이점>

1. wavelet -> filter : 신호를 세분화하여 주파수 대역별로 필터링 진행

2. filter -> wavelet : 특정 주파수 대역의 성분을 세분화 진행.

 

wav_df = pd.DataFrame()

for idx in tqdm(df_98.uid.unique()) :

    uid_1 = df_98[df_98.uid==idx]
    col_lst = uid_1.columns[2:-1].tolist()

    for j in col_lst :
        uid_1[j] = lowpassfilter(uid_1[j].values, 0.27)

    wav_df = pd.concat([wav_df,uid_1])

 

LPF 를 사용하면 이렇게 바뀝니다. 하지만 이번장에선 butterworth filter 을 기준으로 설명할 예정입니다.

 

#2. 데이터 광학밀도 변환(광필터)

 

광학 밀도 변환 (optical density)
광도에 의한 광학 밀도 :
OD = log10(I_0/I_t)

평균 전송 강도에 의한 광학 밀도 변화 :
delta_OD = -log10(I_T/I_average)
평균 광학에 대한 광학 밀도 변화의 수치 배열
형상의 각 채널의 밀도 또는 기준(예: 데이터)

 

#2-1. 광학 밀도 변환 (optical density)

  1. 광학 밀도 계산:
    • 광학 밀도(OD)는 광학 측정을 통해 얻은 신호 강도를 기준 신호 강도로 나누고 로그 변환을 통해 계산됩니다. 광학 밀도 변환의 공식은 다음과 같습니다: OD=log⁡10(I0/I) 여기서 I0는 기준 신호 강도, I는 측정된 신호 강도입니다.
  2. Baseline 보정:
    • 데이터의 첫 100개의 값을 평균하여 baseline을 설정하고, 나머지 데이터에 대해 이를 기준으로 보정합니다.
    • 100번째 이후의 데이터는 첫 100개의 평균 값으로 설정되며, 첫 100개 데이터는 누적 평균 값을 사용합니다.
  3. 헤모글로빈 값 계산:
    • 변환된 OD 값을 사용하여 산소화 헤모글로빈 (HbO) 및 탈산소화 헤모글로빈 (HbR) 값을 Beer-Lambert 법칙을 적용하여 계산합니다.
od_df_98 = pd.DataFrame()

for idx in tqdm(wav_df_98.uid.unique()) :

    raw = wav_df_98[wav_df_98.uid==idx].reset_index(drop=True)

    lst_730 = raw.columns[2:9].tolist()   #730 , 1~7채널
    lst_850 = raw.columns[9:16].tolist()  #850 , 1~7채널

    for jdx in lst_730 :

        ch         = jdx.split("_")[-1]
        ch730_name = "A_730_{}".format(ch)
        ch850_name = "A_850_{}".format(ch)
        print('process columns : {}, {}'.format(ch730_name,ch850_name), end='\r')
        data730 = raw[ch730_name]
        data850 = raw[ch850_name]

        base730 = data730.copy()
        base850 = data850.copy()

        base730[base730.index > 100] = np.mean(base730[:100])
        for ix in base730.index[:100].tolist() :
            base730[ix] = float(np.mean(data730[0:ix+1]))

        base850[base850.index > 100] = np.mean(base850[:100])
        for ix in base850.index[:100].tolist() :
            base850[ix] = float(np.mean(data850[0:ix+1]))

        raw[ch730_name] = np.log10(base730/data730)
        raw[ch850_name] = np.log10(base850/data850)


    od_df_98 = pd.concat([od_df_98,raw])

od_df_98 = od_df_98.reset_index(drop=True)
 
e730_oxy = 0.1096 * 4
e730_deoxy = 0.3257 * 4
e850_oxy = 0.2899 * 4
e850_deoxy = 0.1965 * 4

e_c = (e730_oxy*e850_deoxy-e850_oxy*e730_deoxy)

separation_730 = np.array([4, 3.5, 3, 3.5, 4, 3, 1])
separation_850 = np.array([4, 3.5, 3, 3.5, 4, 3, 1])

hb_df_98 = pd.DataFrame()

for idx in od_df_98.uid.unique() :

    od = od_df_98[od_df_98.uid==idx].reset_index(drop=True)

    lst_730 = raw.columns[2:9].tolist()   #730 , 1~7채널
    lst_850 = raw.columns[9:16].tolist()  #850 , 1~7채널

    for jdx in lst_730 :

        ch         = jdx.split("_")[-1]

        ch730_name = "A_730_{}".format(ch)
        ch850_name = "A_850_{}".format(ch)

        hbo_name = "hbo_{}".format(str(int(ch)+1))
        hbr_name = "hbr_{}".format(str(int(ch)+1))

        OD730 = od[ch730_name]
        OD850 = od[ch850_name]

        hbo = (OD730*e850_deoxy-OD850*e730_deoxy)/(e_c * separation_850[int(ch)])
        hbr = (OD850*e730_oxy-OD730*e850_oxy)/(e_c * separation_850[int(ch)])
        od[ch730_name] = hbo
        od[ch850_name] = hbr

        od.rename(columns = {ch730_name:hbo_name, ch850_name:hbr_name}, inplace = True )
    hb_df_98 = pd.concat([hb_df_98,od])
hb_df_98 = hb_df_98.reset_index(drop=True)

 

#2-2. 평균 전송 강도에 의한 광학 밀도 변환(델타 광학밀도변환)

델타 광학 밀도 계산:

  • 델타 광학 밀도(ΔOD)는 측정된 신호 강도의 절대값을 기준으로 평균 강도를 나누고 로그 변환을 통해 계산됩니다. 델타 OD 변환의 공식은 다음과 같습니다: ΔOD=−log⁡10(I / mean(I))
new_df = pd.DataFrame()

for idx in tqdm(wav_df.uid.unique()) :

    uid_1 = wav_df[wav_df.uid==idx]
    uid1 = uid_1[uid_1.columns[2:-1]]
    intensities = uid1
    means      = np.mean(np.absolute(intensities), axis = 1)
    means      = np.expand_dims(means, axis = 1)
    delta_od   = -np.log10(np.absolute(intensities)/means)
    lst_730  = delta_od.columns[:7].tolist()
    lst_850  = delta_od.columns[7:].tolist()

    hb_df  = pd.DataFrame()

    for i in range(len(lst_730)) :

        red_mes_data = np.reshape(
            delta_od[lst_730[i]].values, (delta_od[lst_730[i]].values.shape[0], 1))
        ir_mes_data = np.reshape(
            delta_od[lst_850[i]].values, (delta_od[lst_850[i]].values.shape[0], 1))
        mes_data_shape = ir_mes_data.shape

        oxy_red = 2.1838
        oxy_ir = 7.0146*1.355
        dxy_red = 10.2470
        dxy_ir = 6.8501*1.355

        mean_baseline_red = np.mean(red_mes_data[0:100])
        mean_baseline_ir = np.mean(ir_mes_data[0:100])

        pos = np.where(
            red_mes_data*mean_baseline_red > 0)

        a_red = np.array([
                math.log(mean_baseline_red/i[0]) if idx in pos[0] else 0 \
                for idx, i in enumerate(red_mes_data)
            ])
        pos = np.where(
            ir_mes_data*mean_baseline_ir > 0
            )
        a_ir = np.array([
                math.log(mean_baseline_ir/i[0]) if idx in pos[0] else 0 \
                for idx, i in enumerate(ir_mes_data)
            ])

        hb = np.zeros(mes_data_shape)
        hbo = np.zeros(mes_data_shape)
        hbt = np.zeros(mes_data_shape)

        ####### Oxy Hb #######
        if ((oxy_red*dxy_ir - oxy_ir*dxy_red)!=0):
            hbo = (a_red*dxy_ir - a_ir*dxy_red)/(oxy_red*dxy_ir - oxy_ir*dxy_red)

        ####### DeOxy Hb #######
        if ((dxy_red*oxy_ir - dxy_ir*oxy_red)!=0):
            hb = (a_red*oxy_ir - a_ir*oxy_red)/(dxy_red*oxy_ir - dxy_ir*oxy_red)
            # hbr 과 같은의미

        hbt = hb+hbo

        ch_name = str(int(lst_730[i].split('_')[-1]) +1)

        hb_df['hbo_'+ ch_name] = hbo
        hb_df['hbr_'+ ch_name] = hb
        hb_df['hbt_'+ ch_name] = hbt
#     hb_df[['hbo_1','hbo_2','hbo_3','hbo_4','hbo_5','hbo_'
#           ,'hbo_1']].plot(figsize=(15, 6),title="유저 {} ".format(idx))

    hb_df.insert(0,'uid',idx)
    hb_df.insert(22, 'label', uid_1.Category.unique()[0] )

    new_df = pd.concat([new_df,hb_df])

 

#3. 보정처리

 

  • 상수 계산 및 보정:
    • 각 채널에 대해 cNIRS_HbO_constant와 cNIRS_Hb_constant를 계산하여 보정된 HbO 및 HbR 값을 얻습니다.
    • 보정된 값은 원래 값에서 기준 채널(hbo_7, hbr_7) 값을 상수만큼 곱한 값을 뺀 것입니다.
  • 데이터프레임 업데이트:
    • 보정된 값(cHbO, cHb)을 새로운 데이터프레임 chb_df에 저장합니다.
  • 새로운 변수 lr01 생성:
    • pd1부터 pd6까지의 값은 각 채널 간의 HbO와 HbR의 차이입니다.
    • lr01은 (pd1 + pd2 + pd3) - (pd4 + pd5 + pd6)로 계산되며, 특정 채널 그룹 간의 차이를 나타냅니다.
    • dc_lr01은 lr01에서 평균 값을 뺀 값으로, DC 오프셋을 제거한 값입니다.
    • mean_chb는 모든 채널 간의 평균 차이를 나타냅니다.

 

  • 신호의 정확한 분석:
    • DC 오프셋이 있는 신호는 평균값이 0이 아니며, 이로 인해 신호의 실제 변동을 정확하게 분석하기 어려워집니다. 오프셋을 제거하면 신호의 평균값을 0으로 맞출 수 있어 신호의 변동성을 더 명확하게 볼 수 있습니다.
  • 잡음 및 왜곡 감소:
    • DC 오프셋이 존재하면 후속 신호 처리 단계에서 추가적인 잡음이나 왜곡을 유발할 수 있습니다. 예를 들어, 필터링 과정에서 DC 성분이 의도하지 않은 결과를 초래할 수 있습니다.
  • 주파수 분석:
    • 주파수 도메인에서 신호를 분석할 때, DC 오프셋은 0Hz 성분으로 나타납니다. 이는 실제로 관심 있는 신호의 주파수 성분을 분석하는 데 방해가 됩니다. 오프셋을 제거하면 주파수 분석의 정확성이 향상됩니다.
  • 신호 처리 기법의 효율성:
    • 많은 신호 처리 알고리즘은 입력 신호가 0 중심으로 분포할 때 최적으로 동작합니다. DC 오프셋이 있는 신호는 이러한 알고리즘의 성능을 저하시키고, 결과의 신뢰성을 떨어뜨릴 수 있습니다.

 

chb_df_98 = pd.DataFrame()

for idx in hb_df_98.uid.unique() :

    chb_df = pd.DataFrame()

    uid1  = hb_df_98[hb_df_98.uid==idx]

    hbo_lst  = uid1[uid1.columns[2:8]].columns
    hbr_lst  = uid1[uid1.columns[9:15]].columns

    for jdx in range(len(hbo_lst)) :

        cNIRS_HbO_constant = np.dot(uid1[hbo_lst[jdx]], uid1.hbo_7) / np.dot(uid1[hbo_lst[jdx]], uid1.hbo_7)
        cNIRS_Hb_constant  = np.dot(uid1[hbr_lst[jdx]], uid1.hbr_7) / np.dot(uid1[hbr_lst[jdx]], uid1.hbr_7)

        if cNIRS_HbO_constant>0:
            cHbO = uid1[hbo_lst[jdx]] - uid1.hbo_7 * float(cNIRS_HbO_constant)
        else :
            cHbO = uid1[hbo_lst[jdx]]
        if cNIRS_Hb_constant >0:
            cHb = uid1[hbr_lst[jdx]] - uid1.hbr_7 * float(cNIRS_Hb_constant)
        else :
            cHb = uid1[hbr_lst[jdx]]

        chb_df['c' + hbo_lst[jdx]] = cHbO[:3400]
        chb_df['c' + hbr_lst[jdx]] = cHb[:3400]

    chb_df.insert(0,'uid', idx)
    chb_df = chb_df[['uid','chbo_1','chbo_2','chbo_3','chbo_4','chbo_5','chbo_6'
                     ,'chbr_1','chbr_2','chbr_3','chbr_4','chbr_5','chbr_6']]
    chb_df['Category'] = uid1.Category.unique()[0]
    chb_df_98 = pd.concat([chb_df_98, chb_df])

chb_df_98 = chb_df_98.reset_index(drop=True)

pd1= chb_df_98.chbo_1 - chb_df_98.chbr_1
pd2= chb_df_98.chbo_2 - chb_df_98.chbr_2
pd3= chb_df_98.chbo_3 - chb_df_98.chbr_3
pd4= chb_df_98.chbo_4 - chb_df_98.chbr_4
pd5= chb_df_98.chbo_5 - chb_df_98.chbr_5
pd6= chb_df_98.chbo_6 - chb_df_98.chbr_6

lr01 = (pd1 + pd2 + pd3) - (pd4 + pd5 + pd6)

chb_df_98.insert(13, 'lr01', lr01)
chb_df_98.insert(14, 'dc_lr01', lr01 - np.mean(lr01))  # remove DC OFFSET
chb_df_98.insert(15, 'mean_chb', (pd1 + pd2 + pd3 +pd4 + pd5 + pd6) / 6)

uids = 14

plt.title("cHb Data - User name : {} -  cHbo,cHbr - 4Channel".format(uids),fontsize=20)
chb_df_98[chb_df_98.uid==uids].chbo_4.plot(figsize=(12,5))
chb_df_98[chb_df_98.uid==uids].chbr_4.plot(figsize=(12,5))

 

- dc offset 제거 전후 비교 plot

chb_df_98[chb_df_98.uid==1].dc_lr01.plot(figsize=(12,5))
chb_df_98[chb_df_98.uid==1].lr01.plot(figsize=(12,5))

 

 

#4. nan 값 제거 후 최종 신호(헤모글로빈 농도값) plot

# NaN값이 있는 유저 확인 후 제거

null_uid = chb_df_98[chb_df_98.lr01.isna()].uid.unique()
print('98 - Nan user Index : {}'.format(null_uid))
for i in null_uid :
      chb_df_98 = chb_df_98[chb_df_98.uid != i]

null_uid = chb_df_35[chb_df_35.lr01.isna()].uid.unique()
print('35 - Nan user Index : {}'.format(null_uid))
for i in null_uid :
      chb_df_35 = chb_df_35[chb_df_35.uid != i]
 
uids = 14

plt.title("cHb Data - User name : {} -  lr01".format(uids),fontsize=20)
chb_df_98[chb_df_98.uid==uids].lr01.plot(figsize=(12,5))

 

 

<결론>

- 신호처리를 위해 무수한 필터를 사용한다.

- 이 페이지에선 butterworth filter 을 사용했으며, 신호가 둥글게 변환된다.

- 광학밀도변환 - np.log10(mean(ch) / ch) 을 사용하였음.

- 상대적 헤모글로빈 농도 변환 - (OD730*e850_deoxy-OD850*e730_deoxy)/(sep_L*e_c) 을 사용하였음.

 

Q. 왜 Low Pass Filter 을 사용하였나요?

A. fNIRS 신호가 저주파수 대역의 신호라서 사용했습니다.

 

Q. 다른 필터는 사용안했나요?

A. BFP, kalman filter, gaussian filter 모두 사용했습니다만,

BFP는 cutoff feq 값의 설정이 애매모호합니다. 생체신호의 일반적인 노이즈는

- 호흡 진동 : 0.3Hz
- 심장 진동 : 0.89Hz
- 동맥혈압 진동 : 0.14Hz

이 존재하지만 사람마다 진동이 다르기 때문에 설정해주기 어려웠습니다.

 

kalman filter 는 시간에 따른 변화량이 특이하게 변하는 생체신호상 적용하면 노이즈가 있는 신호처럼 출력되어 사용하지 못했습니다.

 

gaussian filter 를 사용했을 때, 특정 신호가 아예 날아가버리는 경우가 생겨서 사용하지 않았습니다.

 

(코드에서 사용한 수식들은 논문에서 제시한 공식을 바탕으로 만들어진 수식입니다.)