전자공학을 전공했고, 신호처리에 관해 여러 공부를 해왔습니다. 어디에 써먹을까 했는데 여기에 사용하게 되어서 기쁩니다. 하지만 신호처리의 과정만 다루고 정확한 수식적 해석은 여기선 다루지 않겠습니다. (양이 너무많습니다.)
# Raw data >> nirs_procssing 처리 작업 순서 - 01. 웨이블릿 디노이징 - 02. 5차 버터 로스 패스 필터 적용 or Low Pass Filter 적용 - 03. 광학밀도 변환 - np.log10( mean(ch) / ch) - 04. 상대적 헤모글로빈 농도 변환 - (OD730*e850_deoxy-OD850*e730_deoxy)/(sep_L*e_c)
# nirs_procssing >> c_nirs_processing cNIRS_HbO_constant = np.dot(HbO, ch7_HbO) / np.dot(HbO, ch7_HbO) cNIRS_Hb_constant = np.dot(Hb, ch7_Hb) / np.dot(Hb, ch7_Hb) if cNIRS_HbO_constant>0: cHbO = HbO - ch7_HbO * float(cNIRS_HbO_constant) else: cHbO = HbO if cNIRS_Hb_constant >0: cHb = Hb - ch7_Hb * float(cNIRS_Hb_constant) else: cHb = Hb
#1. 필터처리와 웨이블릿 변환 적용
- 웨이블릿변환을 사용한 이유 : 시간에 따른 신호의 변화량을 파악하기 위해서 사용합니다.
- 이전 페이지에서 결론에 설명했듯 높은 신호값이 많을수록 정상에 가깝고 낮은 신호값이 많을수록 알츠하이머에 가깝기 때문에 변화량을 파악해야합니다.
def butter_lowpass_filter ( data , cutoff , fs , order ) :
normal_cutoff = 2 * cutoff/fs
# Get the filter coefficients
b , a = butter ( order , normal_cutoff , btype= 'low' , analog= False )
y = filtfilt ( b , a , data )
return y
def lowpassfilter ( signal , thresh = 0 , wavelet = "db4" ) :
thresh = thresh*np.nanmax ( signal )
coeff = pywt.wavedec ( signal , wavelet , mode= "per" )
coeff [ 1 :] = ( pywt.threshold ( i , value=thresh , mode= "soft" ) for i in coeff [ 1 :])
reconstructed_signal = pywt.waverec ( coeff , wavelet , mode= "per" )
if len ( signal ) != len ( reconstructed_signal ) :
reconstructed_signal = reconstructed_signal [ 1 :]
return reconstructed_signal [:]
wav_df_98 = pd.DataFrame ()
for idx in tqdm ( df_98.uid.unique ()) :
uid_1 = df_98 [ df_98.uid==idx ]
col_lst = uid_1.columns [ 2 : -1 ] .tolist ()
print ( "columns lst : {}" . format ( col_lst ), end= '\r' )
for j in col_lst :
###################################################################################################
uid_1 [ j ] = denoise_wavelet ( uid_1 [ j ] .values , wavelet= 'db8' ,
mode= 'soft' , wavelet_levels= 5 , method= 'BayesShrink' , rescale_sigma= 'True' )
###################################################################################################
uid_1 [ j ] = butter_lowpass_filter ( uid_1 [ j ] .values , 0.15 , 3 , 5 )
wav_df_98 = pd.concat ([ wav_df_98 , uid_1 ])
wav_df_98 = wav_df_98.reset_index ( drop= True )
uids = 14
plt.title ( "98 Wavelet Denoise Data - User name : {} - 730nm,850nm - 4Channel" . format ( uids ), fontsize= 20 )
wav_df_98 [ wav_df_98.uid==uids ] .A_730_3.plot ( figsize= ( 12 , 5 ))
wav_df_98 [ wav_df_98.uid==uids ] .A_850_3.plot ( figsize= ( 12 , 5 ))
LPF 의 한 종류인 butterworth filter(5차)와 웨이블릿 변환을 적용 후 출력된 신호형태입니다.
###################################################################################################
uid_1 [ j ] = denoise_wavelet( uid_1 [ j ] .values , wavelet='db8' ,
mode= 'soft' , wavelet_levels=5 , method='BayesShrink' , rescale_sigma='True' ) ###################################################################################################
uid_1 [ j ] = butter_lowpass_filter( uid_1 [ j ] .values , 0.15 , 3 , 5 )
이 부분에서 denoise_wavelet 을 먼저 실시하고 filter 을 적용시켰는데
filter 을 먼저 적용하고 denoise_wavelet 을 실시한것과 차이가 있습니다.
<차이점>
1. wavelet -> filter : 신호를 세분화하여 주파수 대역별로 필터링 진행
2. filter -> wavelet : 특정 주파수 대역의 성분을 세분화 진행.
wav_df = pd.DataFrame ()
for idx in tqdm ( df_98.uid.unique ()) :
uid_1 = df_98 [ df_98.uid==idx ]
col_lst = uid_1.columns [ 2 : -1 ] .tolist ()
for j in col_lst :
uid_1 [ j ] = lowpassfilter ( uid_1 [ j ] .values , 0.27 )
wav_df = pd.concat ([ wav_df , uid_1 ])
LPF 를 사용하면 이렇게 바뀝니다. 하지만 이번장에선 butterworth filter 을 기준으로 설명할 예정입니다.
#2. 데이터 광학밀도 변환(광필터)
광학 밀도 변환 (optical density) 광도에 의한 광학 밀도 : OD = log10(I_0/I_t) 평균 전송 강도에 의한 광학 밀도 변화 : delta_OD = -log10(I_T/I_average) 평균 광학에 대한 광학 밀도 변화의 수치 배열 형상의 각 채널의 밀도 또는 기준(예: 데이터)
#2-1. 광학 밀도 변환 (optical density)
광학 밀도 계산:
광학 밀도(OD)는 광학 측정을 통해 얻은 신호 강도를 기준 신호 강도로 나누고 로그 변환을 통해 계산됩니다. 광학 밀도 변환의 공식은 다음과 같습니다: OD=log10(I0/I) 여기서 I0 는 기준 신호 강도, I는 측정된 신호 강도입니다.
Baseline 보정:
데이터의 첫 100개의 값을 평균하여 baseline을 설정하고, 나머지 데이터에 대해 이를 기준으로 보정합니다.
100번째 이후의 데이터는 첫 100개의 평균 값으로 설정되며, 첫 100개 데이터는 누적 평균 값을 사용합니다.
헤모글로빈 값 계산:
변환된 OD 값을 사용하여 산소화 헤모글로빈 (HbO) 및 탈산소화 헤모글로빈 (HbR) 값을 Beer-Lambert 법칙을 적용하여 계산합니다.
od_df_98 = pd.DataFrame ()
for idx in tqdm( wav_df_98.uid.unique ()) :
raw = wav_df_98 [ wav_df_98.uid==idx ] .reset_index ( drop= True )
lst_730 = raw.columns [ 2 : 9 ] .tolist () #730 , 1~7채널
lst_850 = raw.columns [ 9 : 16 ] .tolist () #850 , 1~7채널
for jdx in lst_730 :
ch = jdx.split ( "_" )[ -1 ]
ch730_name = "A_730_{}" . format ( ch )
ch850_name = "A_850_{}" . format ( ch )
print ( 'process columns : {}, {}' . format ( ch730_name , ch850_name ), end='\r' )
data730 = raw [ ch730_name ]
data850 = raw [ ch850_name ]
base730 = data730.copy ()
base850 = data850.copy ()
base730 [ base730.index > 100 ] = np.mean( base730 [: 100 ])
for ix in base730.index[: 100 ] .tolist () :
base730 [ ix ] = float ( np.mean ( data730 [ 0 : ix+ 1 ]))
base850 [ base850.index > 100 ] = np.mean( base850 [: 100 ])
for ix in base850.index[: 100 ] .tolist () :
base850 [ ix ] = float ( np.mean ( data850 [ 0 : ix+ 1 ]))
raw [ ch730_name ] = np.log10( base730/data730 )
raw [ ch850_name ] = np.log10( base850/data850 )
od_df_98 = pd.concat ([ od_df_98 , raw ])
od_df_98 = od_df_98.reset_index ( drop= True )
e730_oxy = 0.1096 * 4
e730_deoxy = 0.3257 * 4
e850_oxy = 0.2899 * 4
e850_deoxy = 0.1965 * 4
e_c = ( e730_oxy*e850_deoxy-e850_oxy*e730_deoxy )
separation_730 = np.array ([ 4 , 3.5 , 3 , 3.5 , 4 , 3 , 1 ])
separation_850 = np.array ([ 4 , 3.5 , 3 , 3.5 , 4 , 3 , 1 ])
hb_df_98 = pd.DataFrame ()
for idx in od_df_98.uid.unique() :
od = od_df_98 [ od_df_98.uid==idx ] .reset_index ( drop= True )
lst_730 = raw.columns [ 2 : 9 ] .tolist () #730 , 1~7채널
lst_850 = raw.columns [ 9 : 16 ] .tolist () #850 , 1~7채널
for jdx in lst_730 :
ch = jdx.split ( "_" )[ -1 ]
ch730_name = "A_730_{}" . format ( ch )
ch850_name = "A_850_{}" . format ( ch )
hbo_name = "hbo_{}" . format ( str ( int ( ch ) + 1 ))
hbr_name = "hbr_{}" . format ( str ( int ( ch ) + 1 ))
OD730 = od [ ch730_name ]
OD850 = od [ ch850_name ]
hbo = ( OD730*e850_deoxy-OD850*e730_deoxy ) / ( e_c * separation_850 [ int ( ch )])
hbr = ( OD850*e730_oxy-OD730*e850_oxy ) / ( e_c * separation_850 [ int ( ch )])
od [ ch730_name ] = hbo
od [ ch850_name ] = hbr
od.rename ( columns = { ch730_name : hbo_name , ch850_name: hbr_name }, inplace = True )
hb_df_98 = pd.concat ([ hb_df_98 , od ])
hb_df_98 = hb_df_98.reset_index ( drop= True )
#2-2. 평균 전송 강도에 의한 광학 밀도 변환(델타 광학밀도변환)
델타 광학 밀도 계산:
델타 광학 밀도(ΔOD)는 측정된 신호 강도의 절대값을 기준으로 평균 강도를 나누고 로그 변환을 통해 계산됩니다. 델타 OD 변환의 공식은 다음과 같습니다: ΔOD=−log10(I / mean(I))
new_df = pd.DataFrame ()
for idx in tqdm ( wav_df.uid.unique ()) :
uid_1 = wav_df [ wav_df.uid==idx ]
uid1 = uid_1 [ uid_1.columns [ 2 : -1 ]]
intensities = uid1
means = np.mean ( np.absolute ( intensities ), axis = 1 )
means = np.expand_dims ( means , axis = 1 )
delta_od = -np.log10 ( np.absolute ( intensities ) /means )
lst_730 = delta_od.columns [: 7 ] .tolist ()
lst_850 = delta_od.columns [ 7 :] .tolist ()
hb_df = pd.DataFrame ()
for i in range ( len ( lst_730 )) :
red_mes_data = np.reshape (
delta_od [ lst_730 [ i ]] .values , ( delta_od [ lst_730 [ i ]] .values.shape [ 0 ], 1 ))
ir_mes_data = np.reshape (
delta_od [ lst_850 [ i ]] .values , ( delta_od [ lst_850 [ i ]] .values.shape [ 0 ], 1 ))
mes_data_shape = ir_mes_data.shape
oxy_red = 2.1838
oxy_ir = 7.0146 * 1.355
dxy_red = 10.2470
dxy_ir = 6.8501 * 1.355
mean_baseline_red = np.mean ( red_mes_data [ 0 : 100 ])
mean_baseline_ir = np.mean ( ir_mes_data [ 0 : 100 ])
pos = np.where (
red_mes_data*mean_baseline_red > 0 )
a_red = np.array ([
math.log ( mean_baseline_red/i [ 0 ]) if idx in pos [ 0 ] else 0 \
for idx , i in enumerate ( red_mes_data )
])
pos = np.where (
ir_mes_data*mean_baseline_ir > 0
)
a_ir = np.array ([
math.log ( mean_baseline_ir/i [ 0 ]) if idx in pos [ 0 ] else 0 \
for idx , i in enumerate ( ir_mes_data )
])
hb = np.zeros ( mes_data_shape )
hbo = np.zeros ( mes_data_shape )
hbt = np.zeros ( mes_data_shape )
####### Oxy Hb #######
if (( oxy_red*dxy_ir - oxy_ir*dxy_red ) != 0 ):
hbo = ( a_red*dxy_ir - a_ir*dxy_red ) / ( oxy_red*dxy_ir - oxy_ir*dxy_red )
####### DeOxy Hb #######
if (( dxy_red*oxy_ir - dxy_ir*oxy_red ) != 0 ):
hb = ( a_red*oxy_ir - a_ir*oxy_red ) / ( dxy_red*oxy_ir - dxy_ir*oxy_red )
# hbr 과 같은의미
hbt = hb+hbo
ch_name = str ( int ( lst_730 [ i ] .split ( '_' )[ -1 ]) + 1 )
hb_df [ 'hbo_' + ch_name ] = hbo
hb_df [ 'hbr_' + ch_name ] = hb
hb_df [ 'hbt_' + ch_name ] = hbt
# hb_df[['hbo_1','hbo_2','hbo_3','hbo_4','hbo_5','hbo_'
# ,'hbo_1']].plot(figsize=(15, 6),title="유저 {} ".format(idx))
hb_df.insert ( 0 , 'uid' , idx )
hb_df.insert ( 22 , 'label' , uid_1.Category.unique ()[ 0 ] )
new_df = pd.concat ([ new_df , hb_df ])
#3. 보정처리
상수 계산 및 보정:
각 채널에 대해 cNIRS_HbO_constant와 cNIRS_Hb_constant를 계산하여 보정된 HbO 및 HbR 값을 얻습니다.
보정된 값은 원래 값에서 기준 채널(hbo_7, hbr_7) 값을 상수만큼 곱한 값을 뺀 것입니다.
데이터프레임 업데이트:
보정된 값(cHbO, cHb)을 새로운 데이터프레임 chb_df에 저장합니다.
새로운 변수 lr01 생성:
pd1부터 pd6까지의 값은 각 채널 간의 HbO와 HbR의 차이입니다.
lr01은 (pd1 + pd2 + pd3) - (pd4 + pd5 + pd6)로 계산되며, 특정 채널 그룹 간의 차이를 나타냅니다.
dc_lr01은 lr01에서 평균 값을 뺀 값으로, DC 오프셋을 제거한 값입니다.
mean_chb는 모든 채널 간의 평균 차이를 나타냅니다.
신호의 정확한 분석 :
DC 오프셋이 있는 신호는 평균값이 0이 아니며, 이로 인해 신호의 실제 변동을 정확하게 분석하기 어려워집니다. 오프셋을 제거하면 신호의 평균값을 0으로 맞출 수 있어 신호의 변동성을 더 명확하게 볼 수 있습니다.
잡음 및 왜곡 감소 :
DC 오프셋이 존재하면 후속 신호 처리 단계에서 추가적인 잡음이나 왜곡을 유발할 수 있습니다. 예를 들어, 필터링 과정에서 DC 성분이 의도하지 않은 결과를 초래할 수 있습니다.
주파수 분석 :
주파수 도메인에서 신호를 분석할 때, DC 오프셋은 0Hz 성분으로 나타납니다. 이는 실제로 관심 있는 신호의 주파수 성분을 분석하는 데 방해가 됩니다. 오프셋을 제거하면 주파수 분석의 정확성이 향상됩니다.
신호 처리 기법의 효율성 :
많은 신호 처리 알고리즘은 입력 신호가 0 중심으로 분포할 때 최적으로 동작합니다. DC 오프셋이 있는 신호는 이러한 알고리즘의 성능을 저하시키고, 결과의 신뢰성을 떨어뜨릴 수 있습니다.
chb_df_98 = pd.DataFrame ()
for idx in hb_df_98.uid.unique () :
chb_df = pd.DataFrame ()
uid1 = hb_df_98 [ hb_df_98.uid==idx ]
hbo_lst = uid1 [ uid1.columns [ 2 : 8 ]] .columns
hbr_lst = uid1 [ uid1.columns [ 9 : 15 ]] .columns
for jdx in range ( len ( hbo_lst )) :
cNIRS_HbO_constant = np.dot ( uid1 [ hbo_lst [ jdx ]], uid1.hbo_7 ) / np.dot ( uid1 [ hbo_lst [ jdx ]], uid1.hbo_7 )
cNIRS_Hb_constant = np.dot ( uid1 [ hbr_lst [ jdx ]], uid1.hbr_7 ) / np.dot ( uid1 [ hbr_lst [ jdx ]], uid1.hbr_7 )
if cNIRS_HbO_constant> 0 :
cHbO = uid1 [ hbo_lst [ jdx ]] - uid1.hbo_7 * float ( cNIRS_HbO_constant )
else :
cHbO = uid1 [ hbo_lst [ jdx ]]
if cNIRS_Hb_constant > 0 :
cHb = uid1 [ hbr_lst [ jdx ]] - uid1.hbr_7 * float ( cNIRS_Hb_constant )
else :
cHb = uid1 [ hbr_lst [ jdx ]]
chb_df [ 'c' + hbo_lst [ jdx ]] = cHbO [: 3400 ]
chb_df [ 'c' + hbr_lst [ jdx ]] = cHb [: 3400 ]
chb_df.insert ( 0 , 'uid' , idx )
chb_df = chb_df [[ 'uid' , 'chbo_1' , 'chbo_2' , 'chbo_3' , 'chbo_4' , 'chbo_5' , 'chbo_6'
, 'chbr_1' , 'chbr_2' , 'chbr_3' , 'chbr_4' , 'chbr_5' , 'chbr_6' ]]
chb_df [ 'Category' ] = uid1.Category.unique ()[ 0 ]
chb_df_98 = pd.concat ([ chb_df_98 , chb_df ])
chb_df_98 = chb_df_98.reset_index ( drop= True )
pd1= chb_df_98.chbo_1 - chb_df_98.chbr_1
pd2= chb_df_98.chbo_2 - chb_df_98.chbr_2
pd3= chb_df_98.chbo_3 - chb_df_98.chbr_3
pd4= chb_df_98.chbo_4 - chb_df_98.chbr_4
pd5= chb_df_98.chbo_5 - chb_df_98.chbr_5
pd6= chb_df_98.chbo_6 - chb_df_98.chbr_6
lr01 = ( pd1 + pd2 + pd3 ) - ( pd4 + pd5 + pd6 )
chb_df_98.insert ( 13 , 'lr01' , lr01 )
chb_df_98.insert ( 14 , 'dc_lr01' , lr01 - np.mean ( lr01 )) # remove DC OFFSET
chb_df_98.insert ( 15 , 'mean_chb' , ( pd1 + pd2 + pd3 +pd4 + pd5 + pd6 ) / 6 )
uids = 14
plt.title ( "cHb Data - User name : {} - cHbo,cHbr - 4Channel" . format ( uids ), fontsize= 20 )
chb_df_98 [ chb_df_98.uid==uids ] .chbo_4.plot ( figsize= ( 12 , 5 ))
chb_df_98 [ chb_df_98.uid==uids ] .chbr_4.plot ( figsize= ( 12 , 5 ))
- dc offset 제거 전후 비교 plot
chb_df_98 [ chb_df_98.uid== 1 ] .dc_lr01.plot ( figsize= ( 12 , 5 ))
chb_df_98 [ chb_df_98.uid== 1 ] .lr01.plot ( figsize= ( 12 , 5 ))
#4. nan 값 제거 후 최종 신호(헤모글로빈 농도값) plot
# NaN값이 있는 유저 확인 후 제거
null_uid = chb_df_98 [ chb_df_98.lr01.isna ()] .uid.unique ()
print ( '98 - Nan user Index : {}' . format ( null_uid ))
for i in null_uid :
chb_df_98 = chb_df_98 [ chb_df_98.uid != i ]
null_uid = chb_df_35 [ chb_df_35.lr01.isna ()] .uid.unique ()
print ( '35 - Nan user Index : {}' . format ( null_uid ))
for i in null_uid :
chb_df_35 = chb_df_35 [ chb_df_35.uid != i ]
uids = 14
plt.title ( "cHb Data - User name : {} - lr01" . format ( uids ), fontsize= 20 )
chb_df_98 [ chb_df_98.uid==uids ] .lr01.plot ( figsize= ( 12 , 5 ))
<결론>
- 신호처리를 위해 무수한 필터를 사용한다.
- 이 페이지에선 butterworth filter 을 사용했으며, 신호가 둥글게 변환된다.
- 광학밀도변환 - np.log10(mean(ch) / ch) 을 사용하였음.
- 상대적 헤모글로빈 농도 변환 - (OD730*e850_deoxy-OD850*e730_deoxy)/(sep_L*e_c) 을 사용하였음.
Q. 왜 Low Pass Filter 을 사용하였나요?
A. fNIRS 신호가 저주파수 대역의 신호라서 사용했습니다.
Q. 다른 필터는 사용안했나요?
A. BFP, kalman filter, gaussian filter 모두 사용했습니다만,
BFP는 cutoff feq 값의 설정이 애매모호합니다. 생체신호의 일반적인 노이즈는
- 호흡 진동 : 0.3Hz - 심장 진동 : 0.89Hz - 동맥혈압 진동 : 0.14Hz
이 존재하지만 사람마다 진동이 다르기 때문에 설정해주기 어려웠습니다.
kalman filter 는 시간에 따른 변화량이 특이하게 변하는 생체신호상 적용하면 노이즈가 있는 신호처럼 출력되어 사용하지 못했습니다.
gaussian filter 를 사용했을 때, 특정 신호가 아예 날아가버리는 경우가 생겨서 사용하지 않았습니다.
(코드에서 사용한 수식들은 논문에서 제시한 공식을 바탕으로 만들어진 수식입니다.)